use crate::forms::*;
use crate::machine::loader::*;
use crate::machine::machine_state::*;
-use crate::machine::streams::Stream;
+use crate::machine::streams::{Stream, StreamOptions};
use crate::machine::ClauseType;
+use crate::machine::MachineStubGen;
use fxhash::FxBuildHasher;
use indexmap::{IndexMap, IndexSet};
pub(super) meta_predicates: MetaPredicateDir,
pub(super) modules: ModuleDir,
pub(super) op_dir: OpDir,
- pub(super) streams: StreamDir,
- pub(super) stream_aliases: StreamAliasDir,
+ streams: StreamDir,
+ stream_aliases: StreamAliasDir,
}
impl IndexStore {
}
}
+ pub(crate) fn add_stream(
+ &mut self,
+ stream: Stream,
+ stub_name: Atom,
+ stub_arity: usize,
+ ) -> Result<(), MachineStubGen> {
+ if let Some(alias) = stream.options().get_alias() {
+ if self.stream_aliases.contains_key(&alias) {
+ return Err(Box::new(move |machine_st| {
+ machine_st.occupied_alias_permission_error(alias, stub_name, stub_arity)
+ }));
+ }
+
+ self.stream_aliases.insert(alias, stream);
+ }
+
+ self.streams.insert(stream);
+
+ Ok(())
+ }
+
+ pub(crate) fn remove_stream(&mut self, stream: Stream) {
+ if let Some(alias) = stream.options().get_alias() {
+ debug_assert_eq!(self.stream_aliases.get(&alias), Some(&stream));
+ self.stream_aliases.swap_remove(&alias);
+ }
+ self.streams.remove(&stream);
+ }
+
+ pub(crate) fn update_stream_options<F: Fn(&mut StreamOptions)>(
+ &mut self,
+ mut stream: Stream,
+ callback: F,
+ ) {
+ if let Some(prev_alias) = stream.options().get_alias() {
+ debug_assert_eq!(self.stream_aliases.get(&prev_alias), Some(&stream));
+ }
+ let options = stream.options_mut();
+ let prev_alias = options.get_alias();
+
+ callback(options);
+
+ if options.get_alias() != prev_alias {
+ if let Some(prev_alias) = prev_alias {
+ self.stream_aliases.swap_remove(&prev_alias);
+ }
+ if let Some(new_alias) = options.get_alias() {
+ self.stream_aliases.insert(new_alias, stream);
+ }
+ }
+ }
+
+ pub(crate) fn has_stream(&self, alias: Atom) -> bool {
+ self.stream_aliases.contains_key(&alias)
+ }
+
+ pub(crate) fn get_stream(&self, alias: Atom) -> Option<Stream> {
+ self.stream_aliases.get(&alias).copied()
+ }
+
+ pub(crate) fn iter_streams<'a, R: std::ops::RangeBounds<Stream>>(
+ &'a self,
+ range: R,
+ ) -> impl Iterator<Item = Stream> + 'a {
+ self.streams.range(range).into_iter().copied()
+ }
+
+ /// Forcibly sets `alias` to `stream`.
+ /// If there was a previous stream with that alias, it will lose that alias.
+ ///
+ /// Consider using [`add_stream`](Self::add_stream) if you wish to instead
+ /// return an error when stream aliases conflict.
+ pub(crate) fn set_stream(&mut self, alias: Atom, mut stream: Stream) {
+ if let Some(mut prev_stream) = self.get_stream(alias) {
+ if prev_stream == stream {
+ // Nothing to do, as the stream is already present
+ return;
+ }
+
+ prev_stream.options_mut().set_alias_to_atom_opt(None);
+ }
+
+ stream.options_mut().set_alias_to_atom_opt(Some(alias));
+
+ self.stream_aliases.insert(alias, stream);
+ self.streams.insert(stream);
+ }
+
#[inline]
pub(super) fn new() -> Self {
index_store!(
)
}
}
+
+/// A stream is said to have a "protected" alias if modifying its
+/// alias would cause breakage in other parts of the code.
+///
+/// A stream with a protected alias cannot be realiased through
+/// [`IndexStore::update_stream_options`]. Instead, one has to use
+/// [`IndexStore::set_stream`] to do so.
+fn is_protected_alias(alias: Atom) -> bool {
+ alias == atom!("user_input") || alias == atom!("user_output") || alias == atom!("user_error")
+}
use std::cell::Cell;
use std::cmp::Ordering;
-use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::env;
#[cfg(feature = "ffi")]
use std::net::{SocketAddr, ToSocketAddrs};
use std::net::{TcpListener, TcpStream};
use std::num::NonZeroU32;
-use std::ops::Sub;
use std::process;
#[cfg(feature = "http")]
use std::str::FromStr;
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("peek_byte"),
2,
)?;
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("peek_char"),
2,
)?;
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("peek_code"),
2,
)?;
pub(crate) fn put_code(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("put_code"),
2,
)?;
pub(crate) fn put_char(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("put_char"),
2,
)?;
pub(crate) fn put_chars(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("$put_chars"),
2,
)?;
pub(crate) fn put_byte(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("put_byte"),
2,
)?;
pub(crate) fn get_byte(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("get_byte"),
2,
)?;
pub(crate) fn get_char(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("get_char"),
2,
)?;
pub(crate) fn get_n_chars(&mut self) -> CallResult {
let stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("get_n_chars"),
3,
)?;
pub(crate) fn get_code(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("get_code"),
2,
)?;
#[inline(always)]
pub(crate) fn first_stream(&mut self) {
- let mut first_stream = None;
- let mut null_streams = BTreeSet::new();
-
- for stream in self.indices.streams.iter().cloned() {
- if !stream.is_null_stream() {
- first_stream = Some(stream);
- break;
- } else {
- null_streams.insert(stream);
- }
- }
-
- self.indices.streams = self.indices.streams.sub(&null_streams);
+ let first_stream = self
+ .indices
+ .iter_streams(..)
+ .filter(|s| !s.is_null_stream())
+ .next();
if let Some(first_stream) = first_stream {
let stream = stream_as_cell!(first_stream);
#[inline(always)]
pub(crate) fn next_stream(&mut self) {
let prev_stream = cell_as_stream!(self.deref_register(1));
-
- let mut next_stream = None;
- let mut null_streams = BTreeSet::new();
-
- for stream in self.indices.streams.range(prev_stream..).skip(1).cloned() {
- if !stream.is_null_stream() {
- next_stream = Some(stream);
- break;
- } else {
- null_streams.insert(stream);
- }
- }
-
- self.indices.streams = self.indices.streams.sub(&null_streams);
+ let next_stream = self
+ .indices
+ .iter_streams(prev_stream..)
+ .filter(|s| !s.is_null_stream())
+ .skip(1)
+ .next();
if let Some(next_stream) = next_stream {
let var = self.deref_register(2).as_var().unwrap();
pub(crate) fn flush_output(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("flush_output"),
1,
)?;
pub(crate) fn close(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("close"),
2,
)?;
return Ok(());
}
- self.indices.streams.remove(&stream);
-
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.swap_remove(&alias);
- }
+ self.indices.remove_stream(stream);
stream.close().map_err(|_| {
let stub = functor_stub(atom!("close"), 1);
&mut self.machine_st.arena,
);
*stream.options_mut() = StreamOptions::default();
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.insert(alias, stream);
- }
- self.indices.streams.insert(stream);
+ self.indices
+ .add_stream(stream, atom!("http_open"), 3)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
let stream = stream_as_cell!(stream);
);
*stream.options_mut() = StreamOptions::default();
stream.options_mut().set_stream_type(StreamType::Binary);
- self.indices.streams.insert(stream);
+
+ self.indices.add_stream(stream, atom!("http_accept"), 7)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+
let stream = stream_as_cell!(stream);
let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
);
*stream.options_mut() = StreamOptions::default();
stream.options_mut().set_stream_type(StreamType::Binary);
- self.indices.streams.insert(stream);
+
+
+ self.indices.add_stream(stream, atom!("http_answer"), 4)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+
let stream = stream_as_cell!(stream);
self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
}
.stream_from_file_spec(file_spec, &mut self.indices, &options)?;
*stream.options_mut() = options;
- self.indices.streams.insert(stream);
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.insert(alias, stream);
- }
+ self.indices
+ .add_stream(stream, atom!("open"), 4)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
let stream_var = self.deref_register(3);
self.machine_st
pub(crate) fn set_stream_options(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("open"),
4,
)?;
pub(crate) fn set_input(&mut self) -> CallResult {
let addr = self.deref_register(1);
- let stream = self.machine_st.get_stream_or_alias(
- addr,
- &self.indices.stream_aliases,
- atom!("set_input"),
- 1,
- )?;
+ let stream =
+ self.machine_st
+ .get_stream_or_alias(addr, &self.indices, atom!("set_input"), 1)?;
if !stream.is_input_stream() {
let stub = functor_stub(atom!("set_input"), 1);
#[inline(always)]
pub(crate) fn set_output(&mut self) -> CallResult {
let addr = self.deref_register(1);
- let stream = self.machine_st.get_stream_or_alias(
- addr,
- &self.indices.stream_aliases,
- atom!("set_output"),
- 1,
- )?;
+ let stream =
+ self.machine_st
+ .get_stream_or_alias(addr, &self.indices, atom!("set_output"), 1)?;
if !stream.is_output_stream() {
let stub = functor_stub(atom!("set_output"), 1);
let stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("read_term"),
3,
)?;
}
if let Some(alias) = options.get_alias() {
- if self.indices.stream_aliases.contains_key(&alias) {
+ if self.indices.has_stream(alias) {
return Err(self.machine_st.occupied_alias_permission_error(
alias,
atom!("socket_client_open"),
*stream.options_mut() = options;
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.insert(alias, stream);
- }
-
- self.indices.streams.insert(stream);
+ self.indices
+ .add_stream(stream, atom!("socket_client_open"), 7)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
stream_as_cell!(stream)
}
return Err(self.machine_st.open_permission_error(
addr,
atom!("socket_client_open"),
- 3,
+ 7,
));
}
Err(ErrorKind::NotFound) => {
}
if let Some(alias) = options.get_alias() {
- if self.indices.stream_aliases.contains_key(&alias) {
+ if self.indices.has_stream(alias) {
return Err(self.machine_st.occupied_alias_permission_error(
alias,
atom!("socket_server_accept"),
*tcp_stream.options_mut() = options;
- if let Some(alias) = &tcp_stream.options().get_alias() {
- self.indices.stream_aliases.insert(*alias, tcp_stream);
- }
-
- self.indices.streams.insert(tcp_stream);
+ self.indices.add_stream(tcp_stream, atom!("socket_server_accept"), 4)
+ .map_err(|stub_gen| {
+ stub_gen(&mut self.machine_st)
+ })?;
let tcp_stream = stream_as_cell!(tcp_stream);
let client = atom_as_cell!(client);
{
let stream0 = self.machine_st.get_stream_or_alias(
self.machine_st.registers[2],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("tls_client_negotiate"),
3,
)?;
let addr = atom!("TLS");
let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena);
- self.indices.streams.insert(stream);
+
+ self.indices
+ .add_stream(stream, atom!("tls_client_negotiate"), 3)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
self.machine_st.heap.push(stream_as_cell!(stream));
let stream_addr = self.deref_register(3);
let stream0 = self.machine_st.get_stream_or_alias(
self.machine_st.registers[3],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("tls_server_negotiate"),
3,
)?;
};
let stream = Stream::from_tls_stream(atom!("TLS"), stream, &mut self.machine_st.arena);
- self.indices.streams.insert(stream);
+
+ self.indices
+ .add_stream(stream, atom!("tls_server_negotiate"), 3)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
let stream_addr = self.deref_register(4);
self.machine_st
pub(crate) fn set_stream_position(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("set_stream_position"),
2,
)?;
pub(crate) fn stream_property(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("stream_property"),
2,
)?;
pub(crate) fn write_term(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("write_term"),
3,
)?;
pub(crate) fn devour_whitespace(&mut self) -> CallResult {
let mut stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
- &self.indices.stream_aliases,
+ &self.indices,
atom!("$devour_whitespace"),
1,
)?;