-use std::borrow::Cow;
+use std::cell::RefCell;
+use std::io::{Seek, SeekFrom, Write};
+use std::rc::Rc;
+use std::{borrow::Cow, io::Cursor};
use rand::{rngs::StdRng, SeedableRng};
}
/// Calls the given callbacks when the respective streams are written to.
- pub fn with_callbacks(stdout: Option<Callback>, stderr: Option<Callback>) -> Self {
- StreamConfig {
- inner: StreamConfigInner::Callbacks { stdout, stderr },
- }
+ ///
+ /// This also returns a handler to the stdin do the [`Machine`](crate::Machine).
+ pub fn with_callbacks(stdout: Option<Callback>, stderr: Option<Callback>) -> (UserInput, Self) {
+ let stdin = Rc::new(RefCell::new(Cursor::new(Vec::new())));
+ (
+ UserInput {
+ inner: stdin.clone(),
+ },
+ StreamConfig {
+ inner: StreamConfigInner::Callbacks {
+ stdin,
+ stdout,
+ stderr,
+ },
+ },
+ )
+ }
+}
+
+/// A handler for the stdin of the [`Machine`](crate::Machine).
+#[derive(Debug)]
+pub struct UserInput {
+ inner: Rc<RefCell<Cursor<Vec<u8>>>>,
+}
+
+impl Write for UserInput {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let mut inner = self.inner.borrow_mut();
+ let pos = inner.position();
+
+ inner.seek(SeekFrom::End(0))?;
+ let result = inner.write(buf);
+ inner.seek(SeekFrom::Start(pos))?;
+
+ result
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.inner.borrow_mut().flush()
}
}
#[default]
Memory,
Callbacks {
+ stdin: Rc<RefCell<Cursor<Vec<u8>>>>,
stdout: Option<Callback>,
stderr: Option<Callback>,
},
Stream::from_owned_string("".to_owned(), &mut machine_st.arena),
Stream::stderr(&mut machine_st.arena),
),
- StreamConfigInner::Callbacks { stdout, stderr } => (
- Stream::Null(StreamOptions::default()),
+ StreamConfigInner::Callbacks {
+ stdin,
+ stdout,
+ stderr,
+ } => (
+ Stream::input_channel(stdin, &mut machine_st.arena),
stdout.map_or_else(
|| Stream::Null(StreamOptions::default()),
|x| Stream::from_callback(x, &mut machine_st.arena),
#[cfg(feature = "http")]
use bytes::{buf::Reader as BufReader, Buf, Bytes};
+use std::cell::RefCell;
use std::cmp::Ordering;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::ptr;
+use std::rc::Rc;
#[cfg(feature = "tls")]
use native_tls::TlsStream;
}
}
+#[derive(Debug)]
+pub struct InputChannelStream {
+ pub(crate) inner: Rc<RefCell<Cursor<Vec<u8>>>>,
+}
+
+impl Read for InputChannelStream {
+ #[inline]
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ self.inner.borrow_mut().read(buf)
+ }
+}
+
#[bitfield]
#[repr(u64)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream);
arena_allocated_impl_for_stream!(StandardErrorStream, StandardErrorStream);
arena_allocated_impl_for_stream!(CharReader<CallbackStream>, CallbackStream);
+arena_allocated_impl_for_stream!(CharReader<InputChannelStream>, InputChannelStream);
#[derive(Debug, Copy, Clone)]
pub enum Stream {
StandardOutput(TypedArenaPtr<StandardOutputStream>),
StandardError(TypedArenaPtr<StandardErrorStream>),
Callback(TypedArenaPtr<CallbackStream>),
+ InputChannel(TypedArenaPtr<InputChannelStream>),
}
impl From<TypedArenaPtr<ReadlineStream>> for Stream {
))
}
+ #[inline]
+ pub fn input_channel(cursor: Rc<RefCell<Cursor<Vec<u8>>>>, arena: &mut Arena) -> Stream {
+ Stream::InputChannel(arena_alloc!(
+ StreamLayout::new(CharReader::new(InputChannelStream { inner: cursor })),
+ arena
+ ))
+ }
+
#[inline]
pub fn stdin(arena: &mut Arena, add_history: bool) -> Stream {
Stream::Readline(arena_alloc!(
Stream::Null(StreamOptions::default())
}
ArenaHeaderTag::CallbackStream => Stream::Callback(unsafe { ptr.as_typed_ptr() }),
+ ArenaHeaderTag::InputChannelStream => {
+ Stream::InputChannel(unsafe { ptr.as_typed_ptr() })
+ }
_ => unreachable!(),
}
}
Stream::StandardOutput(ptr) => ptr.header_ptr(),
Stream::StandardError(ptr) => ptr.header_ptr(),
Stream::Callback(ptr) => ptr.header_ptr(),
+ Stream::InputChannel(ptr) => ptr.header_ptr(),
}
}
Stream::StandardOutput(ref ptr) => &ptr.options,
Stream::StandardError(ref ptr) => &ptr.options,
Stream::Callback(ref ptr) => &ptr.options,
+ Stream::InputChannel(ref ptr) => &ptr.options,
}
}
Stream::StandardOutput(ref mut ptr) => &mut ptr.options,
Stream::StandardError(ref mut ptr) => &mut ptr.options,
Stream::Callback(ref mut ptr) => &mut ptr.options,
+ Stream::InputChannel(ref mut ptr) => &mut ptr.options,
}
}
Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StandardError(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::Callback(ptr) => ptr.lines_read += incr_num_lines_read,
+ Stream::InputChannel(ptr) => ptr.lines_read += incr_num_lines_read,
}
}
Stream::StandardOutput(ptr) => ptr.lines_read = value,
Stream::StandardError(ptr) => ptr.lines_read = value,
Stream::Callback(ptr) => ptr.lines_read = value,
+ Stream::InputChannel(ptr) => ptr.lines_read = value,
}
}
Stream::StandardOutput(ptr) => ptr.lines_read,
Stream::StandardError(ptr) => ptr.lines_read,
Stream::Callback(ptr) => ptr.lines_read,
+ Stream::InputChannel(ptr) => ptr.lines_read,
}
}
}
Stream::Readline(rl_stream) => (*rl_stream).peek_char(),
Stream::StaticString(src) => (*src).peek_char(),
Stream::Byte(cursor) => (*cursor).peek_char(),
+ Stream::InputChannel(cursor) => (*cursor).peek_char(),
#[cfg(feature = "http")]
Stream::HttpWrite(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
Stream::Readline(rl_stream) => (*rl_stream).read_char(),
Stream::StaticString(src) => (*src).read_char(),
Stream::Byte(cursor) => (*cursor).read_char(),
+ Stream::InputChannel(cursor) => (*cursor).read_char(),
#[cfg(feature = "http")]
Stream::HttpWrite(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
| Stream::StandardOutput(_)
| Stream::Null(_)
| Stream::Callback(_) => {}
+ Stream::InputChannel(_) => {}
}
}
Stream::Readline(ref mut rl_stream) => rl_stream.consume(nread),
Stream::StaticString(ref mut src) => src.consume(nread),
Stream::Byte(ref mut cursor) => cursor.consume(nread),
+ Stream::InputChannel(ref mut cursor) => cursor.consume(nread),
#[cfg(feature = "http")]
Stream::HttpWrite(_) => {}
Stream::OutputFile(_)
Stream::Readline(rl_stream) => (*rl_stream).read(buf),
Stream::StaticString(src) => (*src).read(buf),
Stream::Byte(cursor) => (*cursor).read(buf),
+ Stream::InputChannel(cursor) => (*cursor).read(buf),
#[cfg(feature = "http")]
Stream::HttpWrite(_) => Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::WriteToInputStream,
)),
Stream::StaticString(_)
+ | Stream::InputChannel(_)
| Stream::Readline(_)
| Stream::InputFile(..)
| Stream::Null(_) => Err(std::io::Error::new(
StreamError::FlushToInputStream,
)),
Stream::StaticString(_)
+ | Stream::InputChannel(_)
| Stream::Readline(_)
| Stream::InputFile(_)
| Stream::Null(_) => Err(std::io::Error::new(
Stream::StandardOutput(stream) => stream.past_end_of_stream,
Stream::StandardError(stream) => stream.past_end_of_stream,
Stream::Callback(stream) => stream.past_end_of_stream,
+ Stream::InputChannel(stream) => stream.past_end_of_stream,
}
}
Stream::StandardOutput(stream) => stream.past_end_of_stream = value,
Stream::StandardError(stream) => stream.past_end_of_stream = value,
Stream::Callback(stream) => stream.past_end_of_stream = value,
+ Stream::InputChannel(stream) => stream.past_end_of_stream = value,
}
}
#[cfg(feature = "tls")]
Stream::NamedTls(..) => atom!("read_append"),
Stream::Byte(_)
+ | Stream::InputChannel(_)
| Stream::Readline(_)
| Stream::StaticString(_)
| Stream::InputFile(..) => atom!("read"),
stream.drop_payload();
Ok(())
}
+ Stream::InputChannel(mut stream) => {
+ stream.drop_payload();
+ Ok(())
+ }
Stream::StaticString(mut stream) => {
stream.drop_payload();
Ok(())
Stream::HttpRead(..) => true,
Stream::NamedTcp(..)
| Stream::Byte(_)
+ | Stream::InputChannel(_)
| Stream::Readline(_)
| Stream::StaticString(_)
| Stream::InputFile(..) => true,