From 5be771842f1b0f1186d32fa5dafb34710ed8cc03 Mon Sep 17 00:00:00 2001 From: Mark Thom Date: Sat, 9 May 2020 00:56:17 -0600 Subject: [PATCH] add stream_property/2 --- src/prolog/clause_types.rs | 9 + src/prolog/lib/builtins.pl | 48 ++++- src/prolog/machine/compile.rs | 2 +- src/prolog/machine/machine_indices.rs | 5 +- src/prolog/machine/mod.rs | 27 ++- src/prolog/machine/streams.rs | 226 ++++++++++++++++++----- src/prolog/machine/system_calls.rs | 252 ++++++++++++++++++++++++-- src/prolog/macros.rs | 1 + 8 files changed, 505 insertions(+), 65 deletions(-) diff --git a/src/prolog/clause_types.rs b/src/prolog/clause_types.rs index 4a8d8338..b4f1ee85 100644 --- a/src/prolog/clause_types.rs +++ b/src/prolog/clause_types.rs @@ -181,6 +181,7 @@ pub enum SystemClauseType { FetchGlobalVar, FetchGlobalVarWithOffset, FileToChars, + FirstStream, FlushOutput, GetByte, GetChar, @@ -221,6 +222,7 @@ pub enum SystemClauseType { NumberToCodes, OpDeclaration, Open, + NextStream, PartialStringTail, PeekByte, PeekChar, @@ -245,6 +247,7 @@ pub enum SystemClauseType { SetOutput, StoreGlobalVar, StoreGlobalVarWithOffset, + StreamProperty, InferenceLevel, CleanUpBlock, EraseBall, @@ -334,6 +337,7 @@ impl SystemClauseType { clause_name!("$fetch_global_var_with_offset") } &SystemClauseType::FileToChars => clause_name!("$file_to_chars"), + &SystemClauseType::FirstStream => clause_name!("$first_stream"), &SystemClauseType::FlushOutput => clause_name!("$flush_output"), &SystemClauseType::GetByte => clause_name!("$get_byte"), &SystemClauseType::GetChar => clause_name!("$get_char"), @@ -390,6 +394,7 @@ impl SystemClauseType { &SystemClauseType::ModuleHeadIsDynamic => clause_name!("$module_head_is_dynamic"), &SystemClauseType::ModuleExists => clause_name!("$module_exists"), &SystemClauseType::ModuleOf => clause_name!("$module_of"), + &SystemClauseType::NextStream => clause_name!("$next_stream"), &SystemClauseType::NoSuchPredicate => clause_name!("$no_such_predicate"), &SystemClauseType::NumberToChars => clause_name!("$number_to_chars"), &SystemClauseType::NumberToCodes => clause_name!("$number_to_codes"), @@ -416,6 +421,7 @@ impl SystemClauseType { &SystemClauseType::SetInput => clause_name!("$set_input"), &SystemClauseType::SetOutput => clause_name!("$set_output"), &SystemClauseType::SetSeed => clause_name!("$set_seed"), + &SystemClauseType::StreamProperty => clause_name!("$stream_property"), &SystemClauseType::StoreGlobalVar => clause_name!("$store_global_var"), &SystemClauseType::StoreGlobalVarWithOffset => { clause_name!("$store_global_var_with_offset") @@ -492,6 +498,8 @@ impl SystemClauseType { ("$current_hostname", 1) => Some(SystemClauseType::CurrentHostname), ("$current_input", 1) => Some(SystemClauseType::CurrentInput), ("$current_output", 1) => Some(SystemClauseType::CurrentOutput), + ("$first_stream", 1) => Some(SystemClauseType::FirstStream), + ("$next_stream", 2) => Some(SystemClauseType::NextStream), ("$flush_output", 1) => Some(SystemClauseType::FlushOutput), ("$del_attr_non_head", 1) => Some(SystemClauseType::DeleteAttribute), ("$del_attr_head", 1) => Some(SystemClauseType::DeleteHeadAttribute), @@ -566,6 +574,7 @@ impl SystemClauseType { ("$set_cp", 1) => Some(SystemClauseType::SetCutPoint(temp_v!(1))), ("$set_input", 1) => Some(SystemClauseType::SetInput), ("$set_output", 1) => Some(SystemClauseType::SetOutput), + ("$stream_property", 3) => Some(SystemClauseType::StreamProperty), ("$inference_level", 2) => Some(SystemClauseType::InferenceLevel), ("$clean_up_block", 1) => Some(SystemClauseType::CleanUpBlock), ("$erase_ball", 0) => Some(SystemClauseType::EraseBall), diff --git a/src/prolog/lib/builtins.pl b/src/prolog/lib/builtins.pl index c318c4a5..091aa73d 100644 --- a/src/prolog/lib/builtins.pl +++ b/src/prolog/lib/builtins.pl @@ -58,9 +58,9 @@ user:term_expansion((:- op(Pred, Spec, [Op | OtherOps])), OpResults) :- put_code/2, put_char/1, put_char/2, read_term/2, read_term/3, repeat/0, retract/1, set_prolog_flag/2, set_input/1, set_output/1, - setof/3, sub_atom/5, subsumes_term/2, - term_variables/2, throw/1, true/0, - unify_with_occurs_check/2, write/1, + setof/3, stream_property/2, sub_atom/5, + subsumes_term/2, term_variables/2, throw/1, + true/0, unify_with_occurs_check/2, write/1, write_canonical/1, write_term/2, write_term/3, writeq/1]). @@ -1259,3 +1259,45 @@ peek_char(C) :- peek_char(S, C) :- '$peek_char'(S, C). + + +check_stream_property(file_name(F), file_name, F) :- + ( var(F) -> true ; atom(F) ). +check_stream_property(mode(M), mode, M) :- + ( var(M) -> true ; lists:member(M, [read, write, append]) ). +check_stream_property(D, direction, D) :- + ( var(D) -> true ; lists:member(D, [input, output, input_output]), ! ). +check_stream_property(alias(A), alias, A) :- + ( var(A) -> true ; atom(A) ). +check_stream_property(position(P), position, P) :- + ( var(P) -> true ; integer(P), P >= 0 ). +check_stream_property(end_of_stream(E), end_of_stream, E) :- + ( var(E) -> true ; lists:member(E, [not, at, past]) ). +check_stream_property(eof_action(A), eof_action, A) :- + ( var(A) -> true ; lists:member(A, [error, eof_code, reset]) ). +check_stream_property(reposition(B), reposition, B) :- + ( var(B) -> true ; lists:member(B, [true, false]) ). +check_stream_property(type(T), type, T) :- + ( var(T) -> true ; lists:member(T, [text, binary]) ). + + +stream_iter_(S, S). +stream_iter_(S, S1) :- + '$next_stream'(S, S0), + stream_iter_(S0, S1). + +stream_iter(S) :- + ( nonvar(S) -> + true + ; '$first_stream'(S0), + stream_iter_(S0, S) + ). + + +stream_property(S, P) :- + ( nonvar(P), \+ check_stream_property(P, _, _) -> + throw(error(domain_error(stream_property, P), stream_property/2)) + ; stream_iter(S), + check_stream_property(P, PropertyName, PropertyValue), + '$stream_property'(S, PropertyName, PropertyValue) + ). diff --git a/src/prolog/machine/compile.rs b/src/prolog/machine/compile.rs index a887cfb6..19b9d1e3 100644 --- a/src/prolog/machine/compile.rs +++ b/src/prolog/machine/compile.rs @@ -119,7 +119,7 @@ fn load_module_from_file( let mut path_buf = fix_filename(wam.indices.atom_tbl.clone(), path_buf)?; let filename = clause_name!(path_buf.to_string_lossy().to_string(), wam.indices.atom_tbl); - let file_handle = Stream::from_file_as_input(File::open(&path_buf).or_else(|_| { + let file_handle = Stream::from_file_as_input(filename.clone(), File::open(&path_buf).or_else(|_| { Err(SessionError::InvalidFileName(filename.clone())) })?); diff --git a/src/prolog/machine/machine_indices.rs b/src/prolog/machine/machine_indices.rs index 3bd730b3..21ff3e0f 100644 --- a/src/prolog/machine/machine_indices.rs +++ b/src/prolog/machine/machine_indices.rs @@ -19,7 +19,7 @@ use indexmap::IndexMap; use std::cell::RefCell; use std::cmp::Ordering; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::convert::TryFrom; use std::fmt; use std::mem; @@ -824,6 +824,7 @@ impl ModuleStub { pub(crate) type ModuleStubDir = IndexMap; pub(crate) type StreamAliasDir = IndexMap; +pub(crate) type StreamDir = BTreeSet; #[derive(Debug)] pub struct IndexStore { @@ -836,6 +837,7 @@ pub struct IndexStore { pub(super) module_dir: ModuleDir, pub(super) modules: ModuleDir, pub(super) op_dir: OpDir, + pub(super) streams: StreamDir, pub(super) stream_aliases: StreamAliasDir, } @@ -924,6 +926,7 @@ impl IndexStore { op_dir: default_op_dir(), modules: ModuleDir::new(), stream_aliases: StreamAliasDir::new(), + streams: StreamDir::new(), } } diff --git a/src/prolog/machine/mod.rs b/src/prolog/machine/mod.rs index 781fa8a3..78e3215a 100644 --- a/src/prolog/machine/mod.rs +++ b/src/prolog/machine/mod.rs @@ -315,7 +315,10 @@ impl Machine { if path.is_file() { let file_src = match File::open(&path) { - Ok(file_handle) => Stream::from_file_as_input(file_handle), + Ok(file_handle) => Stream::from_file_as_input( + clause_name!(".scryerrc"), + file_handle, + ), Err(_) => return, }; @@ -460,6 +463,28 @@ impl Machine { wam.compile_scryerrc(); + wam.current_input_stream.options.alias = Some(clause_name!("user_input")); + + wam.indices.stream_aliases.insert( + clause_name!("user_input"), + wam.current_input_stream.clone(), + ); + + wam.indices.streams.insert( + wam.current_input_stream.clone() + ); + + wam.current_output_stream.options.alias = Some(clause_name!("user_output")); + + wam.indices.stream_aliases.insert( + clause_name!("user_output"), + wam.current_output_stream.clone(), + ); + + wam.indices.streams.insert( + wam.current_output_stream.clone() + ); + wam } diff --git a/src/prolog/machine/streams.rs b/src/prolog/machine/streams.rs index d47d525c..305c0a20 100644 --- a/src/prolog/machine/streams.rs +++ b/src/prolog/machine/streams.rs @@ -6,6 +6,7 @@ use crate::prolog::machine::machine_errors::*; use crate::prolog::machine::machine_indices::*; use crate::prolog::machine::machine_state::*; +use std::cmp::Ordering; use std::cell::RefCell; use std::error::Error; use std::fmt; @@ -26,8 +27,8 @@ impl StreamType { pub(crate) fn as_str(&self) -> &'static str { match self { - StreamType::Binary => "binary_stream", - StreamType::Text => "text_stream", + StreamType::Binary => "binary", + StreamType::Text => "text", } } @@ -48,23 +49,53 @@ pub enum EOFAction { Reset, } +pub enum AtEndOfStream { + Not, + End, + Past +} + +impl AtEndOfStream { + #[inline] + pub(crate) + fn as_str(&self) -> &'static str { + match self { + AtEndOfStream::Not => "not", + AtEndOfStream::Past => "past", + AtEndOfStream::End => "end", + } + } +} + +impl EOFAction { + #[inline] + pub(crate) + fn as_str(&self) -> &'static str { + match self { + EOFAction::EOFCode => "eof_code", + EOFAction::Error => "error", + EOFAction::Reset => "reset", + } + } +} + /* all these streams are closed automatically when the instance is * dropped. */ pub enum StreamInstance { Bytes(Cursor>), DynReadSource(Box), - InputFile(File), - OutputFile(File), + InputFile(ClauseName, File), + OutputFile(ClauseName, File, bool), // File, append. Null, ReadlineStream(ReadlineStream), Stdout, - TcpStream(TcpStream), + TcpStream(ClauseName, TcpStream), } impl Drop for StreamInstance { fn drop(&mut self) { match self { - StreamInstance::TcpStream(ref mut tcp_stream) => { + StreamInstance::TcpStream(_, ref mut tcp_stream) => { tcp_stream.shutdown(Shutdown::Both).unwrap(); } _ => { @@ -80,14 +111,14 @@ impl fmt::Debug for StreamInstance { write!(fmt, "Bytes({:?})", bytes), &StreamInstance::DynReadSource(_) => write!(fmt, "DynReadSource(_)"), // Hacky solution. - &StreamInstance::InputFile(ref file) => write!(fmt, "InputFile({:?})", file), - &StreamInstance::OutputFile(ref file) => write!(fmt, "OutputFile({:?})", file), + &StreamInstance::InputFile(_, ref file) => write!(fmt, "InputFile({:?})", file), + &StreamInstance::OutputFile(_, ref file, _) => write!(fmt, "OutputFile({:?})", file), &StreamInstance::Null => write!(fmt, "Null"), &StreamInstance::ReadlineStream(ref readline_stream) => write!(fmt, "ReadlineStream({:?})", readline_stream), // &StreamInstance::Stdin => write!(fmt, "Stdin"), &StreamInstance::Stdout => write!(fmt, "Stdout"), - &StreamInstance::TcpStream(ref tcp_stream) => + &StreamInstance::TcpStream(_, ref tcp_stream) => write!(fmt, "TcpStream({:?})", tcp_stream), } } @@ -194,6 +225,20 @@ pub struct Stream { stream_inst: WrappedStreamInstance, } +impl PartialOrd for Stream { + #[inline] + fn partial_cmp(&self, other: &Stream) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Stream { + #[inline] + fn cmp(&self, other: &Stream) -> Ordering { + self.as_ptr().cmp(&other.as_ptr()) + } +} + impl PartialEq for Stream { #[inline] fn eq(&self, other: &Self) -> bool { @@ -203,15 +248,6 @@ impl PartialEq for Stream { impl Eq for Stream {} -impl From for Stream { - fn from(tcp_stream: TcpStream) -> Self { - tcp_stream.set_read_timeout(None).unwrap(); - tcp_stream.set_write_timeout(None).unwrap(); - - Stream::from_inst(StreamInstance::TcpStream(tcp_stream)) - } -} - impl From for Stream { fn from(string: String) -> Self { Stream::from_inst(StreamInstance::Bytes(Cursor::new(string.into_bytes()))) @@ -245,6 +281,105 @@ impl Stream { ptr as *const u8 } + #[inline] + pub(crate) + fn position(&mut self) -> Option { + match *self.stream_inst.0.borrow_mut() { + StreamInstance::InputFile(_, ref mut file) => { + file.seek(SeekFrom::Current(0)).ok() + } + _ => { + None + } + } + } + + #[inline] + pub(crate) + fn position_relative_to_end(&mut self) -> AtEndOfStream { + if self.past_end_of_stream { + return AtEndOfStream::Past; + } + + match *self.stream_inst.0.borrow_mut() { + StreamInstance::InputFile(_, ref mut file) => { + match file.metadata() { + Ok(metadata) => { + if let Ok(position) = file.seek(SeekFrom::Current(0)) { + return match position.cmp(&metadata.len()) { + Ordering::Equal => { + AtEndOfStream::End + } + Ordering::Less => { + AtEndOfStream::Not + } + Ordering::Greater => { + self.past_end_of_stream = true; + AtEndOfStream::Past + } + }; + } else { + self.past_end_of_stream = true; + AtEndOfStream::Past + } + } + _ => { + self.past_end_of_stream = true; + AtEndOfStream::Past + } + } + } + _ => { + AtEndOfStream::Not + } + } + } + + #[inline] + pub(crate) + fn file_name(&self) -> Option { + match *self.stream_inst.0.borrow() { + StreamInstance::InputFile(ref name, _) => { + Some(name.clone()) + } + StreamInstance::OutputFile(ref name, ..) => { + Some(name.clone()) + } + StreamInstance::TcpStream(ref name, _) => { + Some(name.clone()) + } + _ => { + None + } + } + } + + #[inline] + pub(crate) + fn mode(&self) -> &'static str { + match *self.stream_inst.0.borrow() { + StreamInstance::Bytes(_) | + StreamInstance::ReadlineStream(_) | + StreamInstance::DynReadSource(_) | + StreamInstance::InputFile(..) => { + "read" + } + StreamInstance::TcpStream(..) => { + "read_append" + } + StreamInstance::OutputFile(_, _, true) => { + "append" + } + StreamInstance::Stdout | + StreamInstance::OutputFile(_, _, false) => { + "write" + } + StreamInstance::Null => { + "" + } + } + } + #[inline] fn from_inst(stream_inst: StreamInstance) -> Self { Stream { @@ -262,14 +397,23 @@ impl Stream { #[inline] pub(crate) - fn from_file_as_output(file: File) -> Self { - Stream::from_inst(StreamInstance::OutputFile(file)) + fn from_tcp_stream(address: ClauseName, tcp_stream: TcpStream) -> Self { + tcp_stream.set_read_timeout(None).unwrap(); + tcp_stream.set_write_timeout(None).unwrap(); + + Stream::from_inst(StreamInstance::TcpStream(address, tcp_stream)) } #[inline] pub(crate) - fn from_file_as_input(file: File) -> Self { - Stream::from_inst(StreamInstance::InputFile(file)) + fn from_file_as_output(name: ClauseName, file: File, in_append_mode: bool) -> Self { + Stream::from_inst(StreamInstance::OutputFile(name, file, in_append_mode)) + } + + #[inline] + pub(crate) + fn from_file_as_input(name: ClauseName, file: File) -> Self { + Stream::from_inst(StreamInstance::InputFile(name, file)) } #[inline] @@ -321,11 +465,11 @@ impl Stream { fn is_input_stream(&self) -> bool { match *self.stream_inst.0.borrow() { // StreamInstance::Stdin | - StreamInstance::TcpStream(_) | + StreamInstance::TcpStream(..) | StreamInstance::Bytes(_) | StreamInstance::ReadlineStream(_) | StreamInstance::DynReadSource(_) | - StreamInstance::InputFile(_) => { + StreamInstance::InputFile(..) => { true } _ => { @@ -339,9 +483,9 @@ impl Stream { fn is_output_stream(&self) -> bool { match *self.stream_inst.0.borrow() { StreamInstance::Stdout - | StreamInstance::TcpStream(_) + | StreamInstance::TcpStream(..) | StreamInstance::Bytes(_) - | StreamInstance::OutputFile(_) => { + | StreamInstance::OutputFile(..) => { true } _ => { @@ -358,7 +502,7 @@ impl Stream { cursor.set_position(0); true } - StreamInstance::InputFile(ref mut file) => { + StreamInstance::InputFile(_, ref mut file) => { file.seek(SeekFrom::Start(0)).unwrap(); true } @@ -393,7 +537,7 @@ impl Stream { } } } - StreamInstance::InputFile(ref mut file) => { + StreamInstance::InputFile(_, ref mut file) => { let mut b = [0u8; 1]; match file.read(&mut b)? { @@ -412,7 +556,7 @@ impl Stream { StreamInstance::ReadlineStream(ref mut stream) => { stream.peek_byte() } - StreamInstance::TcpStream(ref mut tcp_stream) => { + StreamInstance::TcpStream(_, ref mut tcp_stream) => { let mut b = [0u8; 1]; tcp_stream.peek(&mut b)?; Ok(b[0]) @@ -432,7 +576,7 @@ impl Stream { use unicode_reader::CodePoints; match *self.stream_inst.0.borrow_mut() { - StreamInstance::InputFile(ref mut file) => { + StreamInstance::InputFile(_, ref mut file) => { let c = { let mut iter = CodePoints::from(&*file); @@ -453,7 +597,7 @@ impl Stream { StreamInstance::ReadlineStream(ref mut stream) => { stream.peek_char() } - StreamInstance::TcpStream(ref tcp_stream) => { + StreamInstance::TcpStream(_, ref tcp_stream) => { let c = { let mut buf = [0u8; 8]; tcp_stream.peek(&mut buf)?; @@ -832,10 +976,10 @@ impl MachineState { impl Read for Stream { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match *self.stream_inst.0.borrow_mut() { - StreamInstance::InputFile(ref mut file) => { + StreamInstance::InputFile(_, ref mut file) => { file.read(buf) } - StreamInstance::TcpStream(ref mut tcp_stream) => { + StreamInstance::TcpStream(_, ref mut tcp_stream) => { tcp_stream.read(buf) } StreamInstance::ReadlineStream(ref mut rl_stream) => { @@ -852,7 +996,7 @@ impl Read for Stream { stdin().read(buf) } */ - StreamInstance::OutputFile(_) | StreamInstance::Stdout | StreamInstance::Null => { + StreamInstance::OutputFile(..) | StreamInstance::Stdout | StreamInstance::Null => { Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, @@ -865,10 +1009,10 @@ impl Read for Stream { impl Write for Stream { fn write(&mut self, buf: &[u8]) -> std::io::Result { match *self.stream_inst.0.borrow_mut() { - StreamInstance::OutputFile(ref mut file) => { + StreamInstance::OutputFile(_, ref mut file, _) => { file.write(buf) } - StreamInstance::TcpStream(ref mut tcp_stream) => { + StreamInstance::TcpStream(_, ref mut tcp_stream) => { tcp_stream.write(buf) } StreamInstance::Bytes(ref mut cursor) => { @@ -878,7 +1022,7 @@ impl Write for Stream { stdout().write(buf) } StreamInstance::DynReadSource(_) | StreamInstance::ReadlineStream(_) | - StreamInstance::InputFile(_) | StreamInstance::Null => { + StreamInstance::InputFile(..) | StreamInstance::Null => { Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::WriteToInputStream, @@ -889,10 +1033,10 @@ impl Write for Stream { fn flush(&mut self) -> std::io::Result<()> { match *self.stream_inst.0.borrow_mut() { - StreamInstance::OutputFile(ref mut file) => { + StreamInstance::OutputFile(_, ref mut file, _) => { file.flush() } - StreamInstance::TcpStream(ref mut tcp_stream) => { + StreamInstance::TcpStream(_, ref mut tcp_stream) => { tcp_stream.flush() } StreamInstance::Bytes(ref mut cursor) => { @@ -902,7 +1046,7 @@ impl Write for Stream { stdout().flush() } StreamInstance::DynReadSource(_) | StreamInstance::ReadlineStream(_) | - StreamInstance::InputFile(_) | StreamInstance::Null => { + StreamInstance::InputFile(..) | StreamInstance::Null => { Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::FlushToInputStream, @@ -911,5 +1055,3 @@ impl Write for Stream { } } } - -//TODO: write a Seek instance. diff --git a/src/prolog/machine/system_calls.rs b/src/prolog/machine/system_calls.rs index a97ec40c..73c62e49 100644 --- a/src/prolog/machine/system_calls.rs +++ b/src/prolog/machine/system_calls.rs @@ -21,11 +21,13 @@ use crate::prolog::rug::Integer; use crate::ref_thread_local::RefThreadLocal; use std::cmp; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::io::{ErrorKind, Read, Write}; use std::iter::{once, FromIterator}; use std::fs::{File, OpenOptions}; use std::net::{TcpListener, TcpStream}; +use std::ops::Sub; use std::rc::Rc; use std::time::Duration; @@ -2361,11 +2363,75 @@ impl MachineState { } } } + &SystemClauseType::FirstStream => { + let mut first_stream = None; + let mut null_streams = BTreeSet::new(); + + for stream in indices.streams.iter().cloned() { + if !stream.is_null_stream() { + first_stream = Some(stream); + break; + } else { + null_streams.insert(stream); + } + } + + indices.streams = indices.streams.sub(&null_streams); + + if let Some(first_stream) = first_stream { + let stream = self.heap.to_unifiable(HeapCellValue::Stream(first_stream)); + + let var = self.store(self.deref(self[temp_v!(1)])).as_var().unwrap(); + self.bind(var, stream); + } else { + self.fail = true; + return Ok(()); + } + } + &SystemClauseType::NextStream => { + let prev_stream = + match self.store(self.deref(self[temp_v!(1)])) { + Addr::Stream(h) => { + if let HeapCellValue::Stream(ref stream) = &self.heap[h] { + stream.clone() + } else { + unreachable!() + } + } + _ => { + unreachable!() + } + }; + + let mut next_stream = None; + let mut null_streams = BTreeSet::new(); + + for stream in indices.streams.range(prev_stream.clone() ..).skip(1).cloned() { + if !stream.is_null_stream() { + next_stream = Some(stream); + break; + } else { + null_streams.insert(stream); + } + } + + indices.streams = indices.streams.sub(&null_streams); + + if let Some(next_stream) = next_stream { + let var = self.store(self.deref(self[temp_v!(2)])).as_var().unwrap(); + let next_stream = self.heap.to_unifiable(HeapCellValue::Stream(next_stream)); + + self.bind(var, next_stream); + } else { + self.fail = true; + return Ok(()); + } + } &SystemClauseType::FlushOutput => { let mut stream = self.get_stream_or_alias(self[temp_v!(1)], indices, "flush_output", 1)?; - if stream.is_input_stream() { + if !stream.is_output_stream() { let stub = MachineError::functor_stub(clause_name!("flush_output"), 1); let addr = vec![ @@ -2508,14 +2574,24 @@ impl MachineState { let mut stream = self.get_stream_or_alias(self[temp_v!(1)], indices, "close", 2)?; - if stream.is_output_stream() { + if !stream.is_input_stream() { stream.flush().unwrap(); // 8.11.6.1b) } + indices.streams.remove(&stream); + if stream == *current_input_stream { - *current_input_stream = readline::input_stream(); + *current_input_stream = indices.stream_aliases.get( + &clause_name!("user_input") + ).cloned().unwrap(); + + indices.streams.insert(current_input_stream.clone()); } else if stream == *current_output_stream { - *current_output_stream = Stream::stdout(); + *current_output_stream = indices.stream_aliases.get( + &clause_name!("user_output") + ).cloned().unwrap(); + + indices.streams.insert(current_output_stream.clone()); } stream.close(); @@ -3015,19 +3091,19 @@ impl MachineState { let mut open_options = OpenOptions::new(); - let is_input_file = + let (is_input_file, in_append_mode) = match mode.as_str() { "read" => { open_options.read(true).write(false).create(false); - true + (true, false) } "write" => { open_options.read(false).write(true).truncate(true).create(true); - false + (false, false) } "append" => { open_options.read(false).write(true).create(true).append(true); - false + (false, true) } _ => { let stub = MachineError::functor_stub(clause_name!("open"), 4); @@ -3072,13 +3148,15 @@ impl MachineState { }; let mut stream = if is_input_file { - Stream::from_file_as_input(file) + Stream::from_file_as_input(file_spec, file) } else { - Stream::from_file_as_output(file) + Stream::from_file_as_output(file_spec, file, in_append_mode) }; stream.options = options; + indices.streams.insert(stream.clone()); + if let Some(ref alias) = &stream.options.alias { indices.stream_aliases.insert(alias.clone(), stream.clone()); } @@ -3721,7 +3799,7 @@ impl MachineState { let addr = self.store(self.deref(self[temp_v!(1)])); let stream = self.get_stream_or_alias(addr, indices, "set_input", 1)?; - if stream.is_output_stream() { + if !stream.is_input_stream() { let stub = MachineError::functor_stub( clause_name!("set_input"), 1, @@ -3747,7 +3825,7 @@ impl MachineState { let addr = self.store(self.deref(self[temp_v!(1)])); let stream = self.get_stream_or_alias(addr, indices, "set_output", 1)?; - if stream.is_input_stream() { + if !stream.is_output_stream() { let stub = MachineError::functor_stub( clause_name!("set_input"), 1, @@ -4246,15 +4324,19 @@ impl MachineState { } let stream = - match TcpStream::connect(socket_addr).map_err(|e| e.kind()) { + match TcpStream::connect(&socket_addr).map_err(|e| e.kind()) { Ok(tcp_stream) => { - let mut stream = Stream::from(tcp_stream); + let socket_addr = clause_name!(socket_addr, indices.atom_tbl.clone()); + + let mut stream = Stream::from_tcp_stream(socket_addr, tcp_stream); stream.options = options; if let Some(ref alias) = &stream.options.alias { indices.stream_aliases.insert(alias.clone(), stream.clone()); } + indices.streams.insert(stream.clone()); + self.heap.to_unifiable(HeapCellValue::Stream(stream)) } Err(ErrorKind::PermissionDenied) => { @@ -4383,20 +4465,42 @@ impl MachineState { return Err(self.reposition_error("socket_server_accept", 4)); } + if let Some(ref alias) = &options.alias { + if indices.stream_aliases.contains_key(alias) { + return Err(self.occupied_alias_permission_error( + alias.clone(), + "socket_server_accept", + 4, + )); + } + } + match self.store(self.deref(self[temp_v!(1)])) { Addr::TcpListener(h) => { match &mut self.heap[h] { HeapCellValue::TcpListener(ref mut tcp_listener) => { match tcp_listener.accept().ok() { Some((tcp_stream, socket_addr)) => { - let mut tcp_stream = Stream::from(tcp_stream); + let client = + clause_name!(format!("{}", socket_addr), indices.atom_tbl); + + let mut tcp_stream = + Stream::from_tcp_stream(client.clone(), tcp_stream); + tcp_stream.options = options; + if let Some(ref alias) = &tcp_stream.options.alias { + indices.stream_aliases.insert( + alias.clone(), + tcp_stream.clone(), + ); + } + + indices.streams.insert(tcp_stream.clone()); + let tcp_stream = self.heap.to_unifiable(HeapCellValue::Stream(tcp_stream)); - let client = - clause_name!(format!("{}", socket_addr), indices.atom_tbl); let client = self.heap.to_unifiable(HeapCellValue::Atom(client, None)); @@ -4450,6 +4554,120 @@ impl MachineState { } } } + &SystemClauseType::StreamProperty => { + let mut stream = self.get_stream_or_alias( + self[temp_v!(1)], + indices, + "stream_property", + 2, + )?; + + let property = + match self.store(self.deref(self[temp_v!(2)])) { + Addr::Con(h) if self.heap.atom_at(h) => { + match &self.heap[h] { + HeapCellValue::Atom(ref name, _) => { + match name.as_str() { + "file_name" => { + if let Some(file_name) = stream.file_name() { + HeapCellValue::Atom( + file_name, + None, + ) + } else { + self.fail = true; + return Ok(()); + } + } + "mode" => { + HeapCellValue::Atom( + clause_name!(stream.mode()), + None, + ) + } + "direction" => { + HeapCellValue::Atom( + if stream.is_input_stream() && stream.is_output_stream() { + clause_name!("input_output") + } else if stream.is_input_stream() { + clause_name!("input") + } else { + clause_name!("output") + }, + None, + ) + } + "alias" => { + if let Some(alias) = &stream.options.alias { + HeapCellValue::Atom( + alias.clone(), + None, + ) + } else { + self.fail = true; + return Ok(()); + } + } + "position" => { + if stream.options.reposition { + if let Some(position) = stream.position() { + HeapCellValue::Addr(Addr::Usize(position as usize)) + } else { + unreachable!() + } + } else { + self.fail = true; + return Ok(()); + } + } + "end_of_stream" => { + let end_of_stream_pos = stream.position_relative_to_end(); + + HeapCellValue::Atom( + clause_name!(end_of_stream_pos.as_str()), + None, + ) + } + "eof_action" => { + HeapCellValue::Atom( + clause_name!(stream.options.eof_action.as_str()), + None, + ) + } + "reposition" => { + HeapCellValue::Atom( + clause_name!(if stream.options.reposition { + "true" + } else { + "false" + }), + None, + ) + } + "type" => { + HeapCellValue::Atom( + clause_name!(stream.options.stream_type.as_str()), + None, + ) + } + _ => { + unreachable!() + } + } + } + _ => { + unreachable!() + } + } + } + _ => { + unreachable!() + } + }; + + let property = self.heap.to_unifiable(property); + self.unify(self[temp_v!(3)], property); + } &SystemClauseType::StoreGlobalVar => { let key = self[temp_v!(1)]; diff --git a/src/prolog/macros.rs b/src/prolog/macros.rs index 6a3e46c1..5ac0f450 100644 --- a/src/prolog/macros.rs +++ b/src/prolog/macros.rs @@ -358,6 +358,7 @@ macro_rules! index_store { op_dir: $op_dir, modules: $modules, stream_aliases: StreamAliasDir::new(), + streams: StreamDir::new(), } }; } -- 2.54.0