From dd6533e76c3e41f3ffb2fe6782dcb41210debefa Mon Sep 17 00:00:00 2001 From: bakaq Date: Tue, 28 Jan 2025 17:58:40 -0300 Subject: [PATCH] Add callback streams --- src/arena.rs | 4 ++ src/machine/config.rs | 25 ++++++++++++- src/machine/streams.rs | 84 +++++++++++++++++++++++++++++++++++++++--- src/macros.rs | 1 + 4 files changed, 107 insertions(+), 7 deletions(-) diff --git a/src/arena.rs b/src/arena.rs index e5aceb4d..e3e6cbbd 100644 --- a/src/arena.rs +++ b/src/arena.rs @@ -181,6 +181,7 @@ pub enum ArenaHeaderTag { ReadlineStream = 0b110000, StaticStringStream = 0b110100, ByteStream = 0b111000, + CallbackStream = 0b111001, StandardOutputStream = 0b1100, StandardErrorStream = 0b11000, NullStream = 0b111100, @@ -841,6 +842,9 @@ unsafe fn drop_slab_in_place(value: NonNull, tag: ArenaHeaderTag) { ArenaHeaderTag::ByteStream => { drop_typed_slab_in_place!(ByteStream, value); } + ArenaHeaderTag::CallbackStream => { + drop_typed_slab_in_place!(CallbackStream, value); + } ArenaHeaderTag::LiveLoadState | ArenaHeaderTag::InactiveLoadState => { drop_typed_slab_in_place!(LiveLoadState, value); } diff --git a/src/machine/config.rs b/src/machine/config.rs index 2981899d..34529434 100644 --- a/src/machine/config.rs +++ b/src/machine/config.rs @@ -6,7 +6,8 @@ use crate::Machine; use super::{ bootstrapping_compile, current_dir, import_builtin_impls, libraries, load_module, Atom, - CompilationTarget, IndexStore, ListingSource, MachineArgs, MachineState, Stream, StreamOptions, + Callback, CompilationTarget, IndexStore, ListingSource, MachineArgs, MachineState, Stream, + StreamOptions, }; /// Describes how the streams of a [`Machine`](crate::Machine) will be handled. @@ -31,6 +32,13 @@ impl StreamConfig { inner: StreamConfigInner::Memory, } } + + /// Calls the given callbacks when the respective streams are written to. + pub fn with_callbacks(stdout: Option, stderr: Option) -> Self { + StreamConfig { + inner: StreamConfigInner::Callbacks { stdout, stderr }, + } + } } #[derive(Default)] @@ -38,6 +46,10 @@ enum StreamConfigInner { Stdio, #[default] Memory, + Callbacks { + stdout: Option, + stderr: Option, + }, } /// Describes how a [`Machine`](crate::Machine) will be configured. @@ -90,6 +102,17 @@ impl MachineBuilder { Stream::from_owned_string("".to_owned(), &mut machine_st.arena), Stream::stderr(&mut machine_st.arena), ), + StreamConfigInner::Callbacks { stdout, stderr } => ( + Stream::Null(StreamOptions::default()), + stdout.map_or_else( + || Stream::Null(StreamOptions::default()), + |x| Stream::from_callback(x, &mut machine_st.arena), + ), + stderr.map_or_else( + || Stream::Null(StreamOptions::default()), + |x| Stream::from_callback(x, &mut machine_st.arena), + ), + ), }; let mut wam = Machine { diff --git a/src/machine/streams.rs b/src/machine/streams.rs index b1ecba03..0c180d4a 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -24,6 +24,7 @@ use std::fs::{File, OpenOptions}; use std::hash::Hash; use std::io; use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::mem::ManuallyDrop; use std::net::{Shutdown, TcpStream}; use std::ops::{Deref, DerefMut}; use std::path::PathBuf; @@ -375,6 +376,40 @@ impl Write for StandardErrorStream { } } +pub type Callback = Box>)>; + +pub struct CallbackStream { + pub(crate) inner: Cursor>, + callback: Callback, +} + +impl Debug for CallbackStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CallbackStream") + .field("inner", &self.inner) + .finish() + } +} + +impl Write for CallbackStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let pos = self.inner.position(); + + self.inner.seek(SeekFrom::End(0))?; + let result = self.inner.write(buf); + self.inner.seek(SeekFrom::Start(pos))?; + + result + } + + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + (self.callback)(&mut self.inner); + self.inner.flush() + } +} + #[bitfield] #[repr(u64)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] @@ -500,6 +535,7 @@ arena_allocated_impl_for_stream!(ReadlineStream, ReadlineStream); arena_allocated_impl_for_stream!(StaticStringStream, StaticStringStream); arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream); arena_allocated_impl_for_stream!(StandardErrorStream, StandardErrorStream); +arena_allocated_impl_for_stream!(CharReader, CallbackStream); #[derive(Debug, Copy, Clone)] pub enum Stream { @@ -518,6 +554,7 @@ pub enum Stream { Readline(TypedArenaPtr), StandardOutput(TypedArenaPtr), StandardError(TypedArenaPtr), + Callback(TypedArenaPtr), } impl From> for Stream { @@ -581,6 +618,7 @@ impl Stream { ArenaHeaderTag::Dropped | ArenaHeaderTag::NullStream => { Stream::Null(StreamOptions::default()) } + ArenaHeaderTag::CallbackStream => Stream::Callback(unsafe { ptr.as_typed_ptr() }), _ => unreachable!(), } } @@ -617,6 +655,7 @@ impl Stream { Stream::Readline(ptr) => ptr.header_ptr(), Stream::StandardOutput(ptr) => ptr.header_ptr(), Stream::StandardError(ptr) => ptr.header_ptr(), + Stream::Callback(ptr) => ptr.header_ptr(), } } @@ -637,6 +676,7 @@ impl Stream { Stream::Readline(ref ptr) => &ptr.options, Stream::StandardOutput(ref ptr) => &ptr.options, Stream::StandardError(ref ptr) => &ptr.options, + Stream::Callback(ref ptr) => &ptr.options, } } @@ -657,6 +697,7 @@ impl Stream { Stream::Readline(ref mut ptr) => &mut ptr.options, Stream::StandardOutput(ref mut ptr) => &mut ptr.options, Stream::StandardError(ref mut ptr) => &mut ptr.options, + Stream::Callback(ref mut ptr) => &mut ptr.options, } } @@ -678,6 +719,7 @@ impl Stream { Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read, Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read, Stream::StandardError(ptr) => ptr.lines_read += incr_num_lines_read, + Stream::Callback(ptr) => ptr.lines_read += incr_num_lines_read, } } @@ -699,6 +741,7 @@ impl Stream { Stream::Readline(ptr) => ptr.lines_read = value, Stream::StandardOutput(ptr) => ptr.lines_read = value, Stream::StandardError(ptr) => ptr.lines_read = value, + Stream::Callback(ptr) => ptr.lines_read = value, } } @@ -720,6 +763,7 @@ impl Stream { Stream::Readline(ptr) => ptr.lines_read, Stream::StandardOutput(ptr) => ptr.lines_read, Stream::StandardError(ptr) => ptr.lines_read, + Stream::Callback(ptr) => ptr.lines_read, } } } @@ -744,7 +788,8 @@ impl CharRead for Stream { Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) - | Stream::Null(_) => Some(Err(std::io::Error::new( + | Stream::Null(_) + | Stream::Callback(_) => Some(Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, ))), @@ -770,7 +815,8 @@ impl CharRead for Stream { Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) - | Stream::Null(_) => Some(Err(std::io::Error::new( + | Stream::Null(_) + | Stream::Callback(_) => Some(Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, ))), @@ -793,7 +839,8 @@ impl CharRead for Stream { Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) - | Stream::Null(_) => {} + | Stream::Null(_) + | Stream::Callback(_) => {} } } @@ -813,7 +860,8 @@ impl CharRead for Stream { Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) - | Stream::Null(_) => {} + | Stream::Null(_) + | Stream::Callback(_) => {} } } } @@ -839,7 +887,8 @@ impl Read for Stream { Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) - | Stream::Null(_) => Err(std::io::Error::new( + | Stream::Null(_) + | Stream::Callback(_) => Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, )), @@ -855,6 +904,7 @@ impl Write for Stream { #[cfg(feature = "tls")] Stream::NamedTls(ref mut tls_stream) => tls_stream.get_mut().write(buf), Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf), + Stream::Callback(ref mut callback_stream) => callback_stream.get_mut().write(buf), Stream::StandardOutput(stream) => stream.write(buf), Stream::StandardError(stream) => stream.write(buf), #[cfg(feature = "http")] @@ -881,6 +931,7 @@ impl Write for Stream { #[cfg(feature = "tls")] Stream::NamedTls(ref mut tls_stream) => tls_stream.stream.get_mut().flush(), Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(), + Stream::Callback(ref mut callback_stream) => callback_stream.stream.get_mut().flush(), Stream::StandardError(stream) => stream.stream.flush(), Stream::StandardOutput(stream) => stream.stream.flush(), #[cfg(feature = "http")] @@ -1043,6 +1094,7 @@ impl Stream { Stream::Readline(stream) => stream.past_end_of_stream, Stream::StandardOutput(stream) => stream.past_end_of_stream, Stream::StandardError(stream) => stream.past_end_of_stream, + Stream::Callback(stream) => stream.past_end_of_stream, } } @@ -1069,6 +1121,7 @@ impl Stream { Stream::Readline(stream) => stream.past_end_of_stream = value, Stream::StandardOutput(stream) => stream.past_end_of_stream = value, Stream::StandardError(stream) => stream.past_end_of_stream = value, + Stream::Callback(stream) => stream.past_end_of_stream = value, } } @@ -1175,7 +1228,10 @@ impl Stream { Stream::OutputFile(file) if file.is_append => atom!("append"), #[cfg(feature = "http")] Stream::HttpWrite(_) => atom!("write"), - Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => { + Stream::OutputFile(_) + | Stream::StandardError(_) + | Stream::StandardOutput(_) + | Stream::Callback(_) => { atom!("write") } Stream::Null(_) => atom!(""), @@ -1198,6 +1254,17 @@ impl Stream { )) } + #[inline] + pub fn from_callback(callback: Callback, arena: &mut Arena) -> Self { + Stream::Callback(arena_alloc!( + ManuallyDrop::new(StreamLayout::new(CharReader::new(CallbackStream { + inner: Cursor::new(Vec::new()), + callback, + }))), + arena + )) + } + #[inline] pub(crate) fn from_tcp_stream(address: Atom, tcp_stream: TcpStream, arena: &mut Arena) -> Self { tcp_stream.set_read_timeout(None).unwrap(); @@ -1325,6 +1392,10 @@ impl Stream { stream.drop_payload(); Ok(()) } + Stream::Callback(mut stream) => { + stream.drop_payload(); + Ok(()) + } Stream::StaticString(mut stream) => { stream.drop_payload(); Ok(()) @@ -1370,6 +1441,7 @@ impl Stream { | Stream::StandardOutput(_) | Stream::NamedTcp(..) | Stream::Byte(_) + | Stream::Callback(_) | Stream::OutputFile(..) => true, _ => false, } diff --git a/src/macros.rs b/src/macros.rs index 30a863ca..9b2cbabe 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -305,6 +305,7 @@ macro_rules! match_untyped_arena_ptr_pat { | ArenaHeaderTag::ReadlineStream | ArenaHeaderTag::StaticStringStream | ArenaHeaderTag::ByteStream + | ArenaHeaderTag::CallbackStream | ArenaHeaderTag::StandardOutputStream | ArenaHeaderTag::StandardErrorStream }; -- 2.54.0