use super::{
bootstrapping_compile, current_dir, import_builtin_impls, libraries, load_module, Atom,
- CompilationTarget, IndexStore, ListingSource, MachineArgs, MachineState, Stream, StreamOptions,
+ Callback, CompilationTarget, IndexStore, ListingSource, MachineArgs, MachineState, Stream,
+ StreamOptions,
};
/// Describes how the streams of a [`Machine`](crate::Machine) will be handled.
inner: StreamConfigInner::Memory,
}
}
+
+ /// Calls the given callbacks when the respective streams are written to.
+ pub fn with_callbacks(stdout: Option<Callback>, stderr: Option<Callback>) -> Self {
+ StreamConfig {
+ inner: StreamConfigInner::Callbacks { stdout, stderr },
+ }
+ }
}
#[derive(Default)]
Stdio,
#[default]
Memory,
+ Callbacks {
+ stdout: Option<Callback>,
+ stderr: Option<Callback>,
+ },
}
/// Describes how a [`Machine`](crate::Machine) will be configured.
Stream::from_owned_string("".to_owned(), &mut machine_st.arena),
Stream::stderr(&mut machine_st.arena),
),
+ StreamConfigInner::Callbacks { stdout, stderr } => (
+ Stream::Null(StreamOptions::default()),
+ stdout.map_or_else(
+ || Stream::Null(StreamOptions::default()),
+ |x| Stream::from_callback(x, &mut machine_st.arena),
+ ),
+ stderr.map_or_else(
+ || Stream::Null(StreamOptions::default()),
+ |x| Stream::from_callback(x, &mut machine_st.arena),
+ ),
+ ),
};
let mut wam = Machine {
use std::hash::Hash;
use std::io;
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
+use std::mem::ManuallyDrop;
use std::net::{Shutdown, TcpStream};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
}
}
+pub type Callback = Box<dyn FnMut(&mut Cursor<Vec<u8>>)>;
+
+pub struct CallbackStream {
+ pub(crate) inner: Cursor<Vec<u8>>,
+ callback: Callback,
+}
+
+impl Debug for CallbackStream {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("CallbackStream")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+impl Write for CallbackStream {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let pos = self.inner.position();
+
+ self.inner.seek(SeekFrom::End(0))?;
+ let result = self.inner.write(buf);
+ self.inner.seek(SeekFrom::Start(pos))?;
+
+ result
+ }
+
+ #[inline]
+ fn flush(&mut self) -> std::io::Result<()> {
+ (self.callback)(&mut self.inner);
+ self.inner.flush()
+ }
+}
+
#[bitfield]
#[repr(u64)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
arena_allocated_impl_for_stream!(StaticStringStream, StaticStringStream);
arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream);
arena_allocated_impl_for_stream!(StandardErrorStream, StandardErrorStream);
+arena_allocated_impl_for_stream!(CharReader<CallbackStream>, CallbackStream);
#[derive(Debug, Copy, Clone)]
pub enum Stream {
Readline(TypedArenaPtr<ReadlineStream>),
StandardOutput(TypedArenaPtr<StandardOutputStream>),
StandardError(TypedArenaPtr<StandardErrorStream>),
+ Callback(TypedArenaPtr<CallbackStream>),
}
impl From<TypedArenaPtr<ReadlineStream>> for Stream {
ArenaHeaderTag::Dropped | ArenaHeaderTag::NullStream => {
Stream::Null(StreamOptions::default())
}
+ ArenaHeaderTag::CallbackStream => Stream::Callback(unsafe { ptr.as_typed_ptr() }),
_ => unreachable!(),
}
}
Stream::Readline(ptr) => ptr.header_ptr(),
Stream::StandardOutput(ptr) => ptr.header_ptr(),
Stream::StandardError(ptr) => ptr.header_ptr(),
+ Stream::Callback(ptr) => ptr.header_ptr(),
}
}
Stream::Readline(ref ptr) => &ptr.options,
Stream::StandardOutput(ref ptr) => &ptr.options,
Stream::StandardError(ref ptr) => &ptr.options,
+ Stream::Callback(ref ptr) => &ptr.options,
}
}
Stream::Readline(ref mut ptr) => &mut ptr.options,
Stream::StandardOutput(ref mut ptr) => &mut ptr.options,
Stream::StandardError(ref mut ptr) => &mut ptr.options,
+ Stream::Callback(ref mut ptr) => &mut ptr.options,
}
}
Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StandardError(ptr) => ptr.lines_read += incr_num_lines_read,
+ Stream::Callback(ptr) => ptr.lines_read += incr_num_lines_read,
}
}
Stream::Readline(ptr) => ptr.lines_read = value,
Stream::StandardOutput(ptr) => ptr.lines_read = value,
Stream::StandardError(ptr) => ptr.lines_read = value,
+ Stream::Callback(ptr) => ptr.lines_read = value,
}
}
Stream::Readline(ptr) => ptr.lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read,
Stream::StandardError(ptr) => ptr.lines_read,
+ Stream::Callback(ptr) => ptr.lines_read,
}
}
}
Stream::OutputFile(_)
| Stream::StandardError(_)
| Stream::StandardOutput(_)
- | Stream::Null(_) => Some(Err(std::io::Error::new(
+ | Stream::Null(_)
+ | Stream::Callback(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
))),
Stream::OutputFile(_)
| Stream::StandardError(_)
| Stream::StandardOutput(_)
- | Stream::Null(_) => Some(Err(std::io::Error::new(
+ | Stream::Null(_)
+ | Stream::Callback(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
))),
Stream::OutputFile(_)
| Stream::StandardError(_)
| Stream::StandardOutput(_)
- | Stream::Null(_) => {}
+ | Stream::Null(_)
+ | Stream::Callback(_) => {}
}
}
Stream::OutputFile(_)
| Stream::StandardError(_)
| Stream::StandardOutput(_)
- | Stream::Null(_) => {}
+ | Stream::Null(_)
+ | Stream::Callback(_) => {}
}
}
}
Stream::OutputFile(_)
| Stream::StandardError(_)
| Stream::StandardOutput(_)
- | Stream::Null(_) => Err(std::io::Error::new(
+ | Stream::Null(_)
+ | Stream::Callback(_) => Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
)),
#[cfg(feature = "tls")]
Stream::NamedTls(ref mut tls_stream) => tls_stream.get_mut().write(buf),
Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf),
+ Stream::Callback(ref mut callback_stream) => callback_stream.get_mut().write(buf),
Stream::StandardOutput(stream) => stream.write(buf),
Stream::StandardError(stream) => stream.write(buf),
#[cfg(feature = "http")]
#[cfg(feature = "tls")]
Stream::NamedTls(ref mut tls_stream) => tls_stream.stream.get_mut().flush(),
Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(),
+ Stream::Callback(ref mut callback_stream) => callback_stream.stream.get_mut().flush(),
Stream::StandardError(stream) => stream.stream.flush(),
Stream::StandardOutput(stream) => stream.stream.flush(),
#[cfg(feature = "http")]
Stream::Readline(stream) => stream.past_end_of_stream,
Stream::StandardOutput(stream) => stream.past_end_of_stream,
Stream::StandardError(stream) => stream.past_end_of_stream,
+ Stream::Callback(stream) => stream.past_end_of_stream,
}
}
Stream::Readline(stream) => stream.past_end_of_stream = value,
Stream::StandardOutput(stream) => stream.past_end_of_stream = value,
Stream::StandardError(stream) => stream.past_end_of_stream = value,
+ Stream::Callback(stream) => stream.past_end_of_stream = value,
}
}
Stream::OutputFile(file) if file.is_append => atom!("append"),
#[cfg(feature = "http")]
Stream::HttpWrite(_) => atom!("write"),
- Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => {
+ Stream::OutputFile(_)
+ | Stream::StandardError(_)
+ | Stream::StandardOutput(_)
+ | Stream::Callback(_) => {
atom!("write")
}
Stream::Null(_) => atom!(""),
))
}
+ #[inline]
+ pub fn from_callback(callback: Callback, arena: &mut Arena) -> Self {
+ Stream::Callback(arena_alloc!(
+ ManuallyDrop::new(StreamLayout::new(CharReader::new(CallbackStream {
+ inner: Cursor::new(Vec::new()),
+ callback,
+ }))),
+ arena
+ ))
+ }
+
#[inline]
pub(crate) fn from_tcp_stream(address: Atom, tcp_stream: TcpStream, arena: &mut Arena) -> Self {
tcp_stream.set_read_timeout(None).unwrap();
stream.drop_payload();
Ok(())
}
+ Stream::Callback(mut stream) => {
+ stream.drop_payload();
+ Ok(())
+ }
Stream::StaticString(mut stream) => {
stream.drop_payload();
Ok(())
| Stream::StandardOutput(_)
| Stream::NamedTcp(..)
| Stream::Byte(_)
+ | Stream::Callback(_)
| Stream::OutputFile(..) => true,
_ => false,
}