-use std::cell::RefCell;
-use std::io::{Seek, SeekFrom, Write};
-use std::rc::Rc;
-use std::{borrow::Cow, io::Cursor};
+use std::borrow::Cow;
+use std::io::Write;
+use std::sync::mpsc::{channel, Receiver, Sender};
use rand::{rngs::StdRng, SeedableRng};
///
/// This also returns a handler to the stdin do the [`Machine`](crate::Machine).
pub fn with_callbacks(stdout: Option<Callback>, stderr: Option<Callback>) -> (UserInput, Self) {
- let stdin = Rc::new(RefCell::new(Cursor::new(Vec::new())));
+ let (sender, receiver) = channel();
(
- UserInput {
- inner: stdin.clone(),
- },
+ UserInput { inner: sender },
StreamConfig {
inner: StreamConfigInner::Callbacks {
- stdin,
+ stdin: receiver,
stdout,
stderr,
},
/// A handler for the stdin of the [`Machine`](crate::Machine).
#[derive(Debug)]
pub struct UserInput {
- inner: Rc<RefCell<Cursor<Vec<u8>>>>,
+ inner: Sender<Vec<u8>>,
}
impl Write for UserInput {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- let mut inner = self.inner.borrow_mut();
- let pos = inner.position();
-
- inner.seek(SeekFrom::End(0))?;
- let result = inner.write(buf);
- inner.seek(SeekFrom::Start(pos))?;
-
- result
+ self.inner
+ .send(buf.into())
+ .map(|_| buf.len())
+ .map_err(|_| std::io::ErrorKind::BrokenPipe.into())
}
fn flush(&mut self) -> std::io::Result<()> {
- self.inner.borrow_mut().flush()
+ Ok(())
}
}
#[default]
Memory,
Callbacks {
- stdin: Rc<RefCell<Cursor<Vec<u8>>>>,
+ stdin: Receiver<Vec<u8>>,
stdout: Option<Callback>,
stderr: Option<Callback>,
},
#[cfg(feature = "http")]
use bytes::{buf::Reader as BufReader, Buf, Bytes};
-use std::cell::RefCell;
use std::cmp::Ordering;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::ptr;
-use std::rc::Rc;
+use std::sync::mpsc::Receiver;
+use std::sync::mpsc::TryRecvError;
#[cfg(feature = "tls")]
use native_tls::TlsStream;
#[derive(Debug)]
pub struct InputChannelStream {
- pub(crate) inner: Rc<RefCell<Cursor<Vec<u8>>>>,
+ pub(crate) inner: Cursor<Vec<u8>>,
+ pub eof: bool,
+ channel: Receiver<Vec<u8>>,
}
impl Read for InputChannelStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
- self.inner.borrow_mut().read(buf)
+ if self.eof {
+ return Ok(0);
+ }
+
+ let to_read = buf.len();
+ let mut total_read = 0;
+
+ loop {
+ total_read += self.inner.read(&mut buf[total_read..])?;
+
+ if total_read < to_read {
+ // We need to get more data to read
+ match self.channel.try_recv() {
+ Ok(data) => {
+ // Append into self.inner
+ let pos = self.inner.position();
+ assert_eq!(pos as usize, self.inner.get_ref().len());
+ self.inner.write_all(&data)?;
+ self.inner.seek(SeekFrom::Start(pos))?;
+ }
+ Err(TryRecvError::Empty) => {
+ // Data is pending
+ break;
+ }
+ Err(TryRecvError::Disconnected) => {
+ // The other end of the channel was closed so we are EOF
+ self.eof = true;
+ break;
+ }
+ }
+ } else {
+ assert_eq!(total_read, to_read);
+ break;
+ }
+ }
+ Ok(total_read)
}
}
}
#[inline]
- pub fn input_channel(cursor: Rc<RefCell<Cursor<Vec<u8>>>>, arena: &mut Arena) -> Stream {
+ pub fn input_channel(channel: Receiver<Vec<u8>>, arena: &mut Arena) -> Stream {
+ let inner = Cursor::new(Vec::new());
Stream::InputChannel(arena_alloc!(
- StreamLayout::new(CharReader::new(InputChannelStream { inner: cursor })),
+ StreamLayout::new(CharReader::new(InputChannelStream {
+ inner,
+ eof: false,
+ channel
+ })),
arena
))
}
AtEndOfStream::Past
}
}
+ Stream::InputChannel(stream_layout) => {
+ if stream_layout.stream.get_ref().eof {
+ AtEndOfStream::At
+ } else {
+ AtEndOfStream::Not
+ }
+ }
_ => AtEndOfStream::Not,
}
}
readline_stream.reset();
true
}
+ Stream::InputChannel(ref mut input_channel_stream) => {
+ input_channel_stream.stream.get_mut().inner.set_position(0);
+ true
+ }
_ => false,
}
}