summaryrefslogtreecommitdiff
path: root/crates/windlass/src/transport/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/windlass/src/transport/mod.rs')
-rw-r--r--crates/windlass/src/transport/mod.rs30
1 files changed, 15 insertions, 15 deletions
diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs
index f5c5fc3..6f2cc7c 100644
--- a/crates/windlass/src/transport/mod.rs
+++ b/crates/windlass/src/transport/mod.rs
@@ -1,5 +1,6 @@
-use read::{FrameReader, ReceivedFrame, ReceiverError};
+use read::{FrameReader, ReceivedFrame};
use std::{collections::VecDeque, sync::Arc, time::Duration};
+use tokio::io::AsyncReadExt;
use tokio::{
io::{AsyncBufRead, AsyncWrite, AsyncWriteExt},
pin, select, spawn,
@@ -7,7 +8,7 @@ use tokio::{
task::JoinHandle,
time::{sleep_until, Instant},
};
-use tokio_util::sync::CancellationToken;
+use tokio_util::{bytes::BytesMut, sync::CancellationToken};
use crate::encoding::crc16;
@@ -89,9 +90,6 @@ pub enum TransportError {
#[error("message encoding failed: {0}")]
MessageEncode(#[from] MessageEncodeError),
- #[error("receiver error: {0}")]
- Receiver(#[from] ReceiverError),
-
#[error("transmitter error: {0}")]
Transmitter(#[from] TransmitterError),
@@ -144,7 +142,8 @@ impl RttState {
/// State for the task which deals with transport state
#[derive(Debug)]
struct TransportState<R, W> {
- rdr: FrameReader<R>,
+ frdr: FrameReader,
+ rdr: R,
wr: W,
data_send: UnboundedSender<Result<Vec<u8>, TransportError>>,
@@ -176,7 +175,8 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
cancel: CancellationToken,
) -> Self {
Self {
- rdr: FrameReader::new(rdr),
+ frdr: FrameReader::default(),
+ rdr,
wr,
data_send,
cmd_recv,
@@ -199,6 +199,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
async fn run(&mut self) -> Result<(), TransportError> {
+ let mut buf = BytesMut::with_capacity(MESSAGE_LENGTH_MAX);
loop {
if self.retransmit_now {
self.retransmit_pending().await?;
@@ -208,6 +209,10 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
self.send_more_frames().await?;
}
+ while let Some(frame) = self.frdr.read_frame() {
+ self.handle_frame(frame);
+ }
+
let retransmit_deadline = self
.inflight_messages
.front()
@@ -220,15 +225,10 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
self.corked_until.map(sleep_until).into();
pin!(corked_timeout);
- // FIXME: This is not correct because read_frame is not cancellation safe
select! {
- frame = self.rdr.read_frame() => {
- let frame = frame?;
- let frame = match frame {
- Some(frame) => frame,
- None => break,
- };
- self.handle_frame(frame);
+ _ = self.rdr.read_buf(&mut buf) => {
+ self.frdr.receive_data(&buf);
+ buf.clear();
},
msg = self.cmd_recv.recv() => {