impl Debug for HttpWriteStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Http Write Stream")
+ write!(f, "Http Write Stream")
}
}
impl Write for HttpWriteStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- let bytes = Bytes::copy_from_slice(buf);
- let len = bytes.len();
- match self.body_writer.try_send_data(bytes) {
- Ok(()) => Ok(len),
- Err(_) => Err(std::io::Error::from(ErrorKind::Interrupted))
- }
+ let bytes = Bytes::copy_from_slice(buf);
+ let len = bytes.len();
+ match self.body_writer.try_send_data(bytes) {
+ Ok(()) => Ok(len),
+ Err(_) => Err(std::io::Error::from(ErrorKind::Interrupted))
+ }
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
- Ok(())
+ Ok(())
}
}
ArenaHeaderTag::NamedTcpStream => Stream::NamedTcp(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::NamedTlsStream => Stream::NamedTls(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::HttpReadStream => Stream::HttpRead(TypedArenaPtr::new(ptr as *mut _)),
- ArenaHeaderTag::HttpWriteStream => Stream::HttpWrite(TypedArenaPtr::new(ptr as *mut _)),
+ ArenaHeaderTag::HttpWriteStream => Stream::HttpWrite(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::ReadlineStream => Stream::Readline(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::StaticStringStream => {
Stream::StaticString(TypedArenaPtr::new(ptr as *mut _))
Stream::NamedTcp(ptr) => ptr.header_ptr(),
Stream::NamedTls(ptr) => ptr.header_ptr(),
Stream::HttpRead(ptr) => ptr.header_ptr(),
- Stream::HttpWrite(ptr) => ptr.header_ptr(),
+ Stream::HttpWrite(ptr) => ptr.header_ptr(),
Stream::Null(_) => ptr::null(),
Stream::Readline(ptr) => ptr.header_ptr(),
Stream::StandardOutput(ptr) => ptr.header_ptr(),
Stream::NamedTcp(ref ptr) => &ptr.options,
Stream::NamedTls(ref ptr) => &ptr.options,
Stream::HttpRead(ref ptr) => &ptr.options,
- Stream::HttpWrite(ref ptr) => &ptr.options,
+ Stream::HttpWrite(ref ptr) => &ptr.options,
Stream::Null(ref options) => options,
Stream::Readline(ref ptr) => &ptr.options,
Stream::StandardOutput(ref ptr) => &ptr.options,
Stream::NamedTcp(ref mut ptr) => &mut ptr.options,
Stream::NamedTls(ref mut ptr) => &mut ptr.options,
Stream::HttpRead(ref mut ptr) => &mut ptr.options,
- Stream::HttpWrite(ref mut ptr) => &mut ptr.options,
+ Stream::HttpWrite(ref mut ptr) => &mut ptr.options,
Stream::Null(ref mut options) => options,
Stream::Readline(ref mut ptr) => &mut ptr.options,
Stream::StandardOutput(ref mut ptr) => &mut ptr.options,
Stream::NamedTcp(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::NamedTls(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::HttpRead(ptr) => ptr.lines_read += incr_num_lines_read,
- Stream::HttpWrite(_) => {}
+ Stream::HttpWrite(_) => {}
Stream::Null(_) => {}
Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::NamedTcp(ptr) => ptr.lines_read = value,
Stream::NamedTls(ptr) => ptr.lines_read = value,
Stream::HttpRead(ptr) => ptr.lines_read = value,
- Stream::HttpWrite(_) => {}
+ Stream::HttpWrite(_) => {}
Stream::Null(_) => {}
Stream::Readline(ptr) => ptr.lines_read = value,
Stream::StandardOutput(ptr) => ptr.lines_read = value,
Stream::NamedTcp(ptr) => ptr.lines_read,
Stream::NamedTls(ptr) => ptr.lines_read,
Stream::HttpRead(ptr) => ptr.lines_read,
- Stream::HttpWrite(_) => 0,
+ Stream::HttpWrite(_) => 0,
Stream::Null(_) => 0,
Stream::Readline(ptr) => ptr.lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read,
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
- Stream::HttpWrite(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
- Stream::HttpWrite(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
- Stream::HttpWrite(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => {}
}
}
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
- Stream::HttpWrite(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => {}
}
}
Stream::StaticString(src) => (*src).read(buf),
Stream::Byte(cursor) => (*cursor).read(buf),
Stream::OutputFile(_)
- | Stream::StandardError(_)
- | Stream::StandardOutput(_)
- | Stream::HttpWrite(_)
- | Stream::Null(_) => Err(std::io::Error::new(
- ErrorKind::PermissionDenied,
- StreamError::ReadFromOutputStream,
- )),
+ | Stream::StandardError(_)
+ | Stream::StandardOutput(_)
+ | Stream::HttpWrite(_)
+ | Stream::Null(_) => Err(std::io::Error::new(
+ ErrorKind::PermissionDenied,
+ StreamError::ReadFromOutputStream,
+ )),
};
bytes_read
Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf),
Stream::StandardOutput(stream) => stream.write(buf),
Stream::StandardError(stream) => stream.write(buf),
- Stream::HttpWrite(ref mut stream) => stream.get_mut().write(buf),
+ Stream::HttpWrite(ref mut stream) => stream.get_mut().write(buf),
Stream::HttpRead(_) |
Stream::StaticString(_) |
Stream::Readline(_) |
Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(),
Stream::StandardError(stream) => stream.stream.flush(),
Stream::StandardOutput(stream) => stream.stream.flush(),
- Stream::HttpWrite(ref mut stream) => stream.stream.get_mut().flush(),
+ Stream::HttpWrite(ref mut stream) => stream.stream.get_mut().flush(),
Stream::HttpRead(_) |
Stream::StaticString(_) |
Stream::Readline(_) |
file_stream.position()
}
Stream::NamedTcp(..)
- | Stream::NamedTls(..)
- | Stream::Readline(..)
- | Stream::StaticString(..)
- | Stream::Byte(..) => Some(0),
+ | Stream::NamedTls(..)
+ | Stream::Readline(..)
+ | Stream::StaticString(..)
+ | Stream::Byte(..) => Some(0),
_ => None,
};
Stream::NamedTcp(stream) => stream.past_end_of_stream,
Stream::NamedTls(stream) => stream.past_end_of_stream,
Stream::HttpRead(stream) => stream.past_end_of_stream,
- Stream::HttpWrite(stream) => stream.past_end_of_stream,
+ Stream::HttpWrite(stream) => stream.past_end_of_stream,
Stream::Null(_) => false,
Stream::Readline(stream) => stream.past_end_of_stream,
Stream::StandardOutput(stream) => stream.past_end_of_stream,
Stream::NamedTcp(stream) => stream.past_end_of_stream = value,
Stream::NamedTls(stream) => stream.past_end_of_stream = value,
Stream::HttpRead(stream) => stream.past_end_of_stream = value,
- Stream::HttpWrite(stream) => stream.past_end_of_stream = value,
+ Stream::HttpWrite(stream) => stream.past_end_of_stream = value,
Stream::Null(_) => {}
Stream::Readline(stream) => stream.past_end_of_stream = value,
Stream::StandardOutput(stream) => stream.past_end_of_stream = value,
pub(crate) fn mode(&self) -> Atom {
match self {
Stream::Byte(_)
- | Stream::Readline(_)
- | Stream::StaticString(_)
- | Stream::HttpRead(_)
- | Stream::InputFile(..) => atom!("read"),
+ | Stream::Readline(_)
+ | Stream::StaticString(_)
+ | Stream::HttpRead(_)
+ | Stream::InputFile(..) => atom!("read"),
Stream::NamedTcp(..) | Stream::NamedTls(..) => atom!("read_append"),
Stream::OutputFile(file) if file.is_append => atom!("append"),
Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | Stream::HttpWrite(_) => atom!("write"),
#[inline]
pub(crate) fn from_http_sender(
- body_writer: Sender,
- arena: &mut Arena,
+ body_writer: Sender,
+ arena: &mut Arena,
) -> Self {
- Stream::HttpWrite(arena_alloc!(
- StreamLayout::new(CharReader::new(HttpWriteStream {
- body_writer
- })),
- arena
- ))
+ Stream::HttpWrite(arena_alloc!(
+ StreamLayout::new(CharReader::new(HttpWriteStream {
+ body_writer
+ })),
+ arena
+ ))
}
#[inline]
pub(crate) fn is_input_stream(&self) -> bool {
match self {
Stream::NamedTcp(..)
- | Stream::NamedTls(..)
- | Stream::HttpRead(..)
- | Stream::Byte(_)
- | Stream::Readline(_)
- | Stream::StaticString(_)
- | Stream::InputFile(..) => true,
+ | Stream::NamedTls(..)
+ | Stream::HttpRead(..)
+ | Stream::Byte(_)
+ | Stream::Readline(_)
+ | Stream::StaticString(_)
+ | Stream::InputFile(..) => true,
_ => false,
}
}
pub(crate) fn is_output_stream(&self) -> bool {
match self {
Stream::StandardError(_)
- | Stream::StandardOutput(_)
- | Stream::NamedTcp(..)
- | Stream::NamedTls(..)
- | Stream::HttpWrite(..)
- | Stream::Byte(_)
- | Stream::OutputFile(..) => true,
+ | Stream::StandardOutput(_)
+ | Stream::NamedTcp(..)
+ | Stream::NamedTls(..)
+ | Stream::HttpWrite(..)
+ | Stream::Byte(_)
+ | Stream::OutputFile(..) => true,
_ => false,
}
}
stream_type: HeapCellValue,
) -> StreamOptions {
let alias = read_heap_cell!(self.store(MachineState::deref(self, alias)),
- (HeapCellValueTag::Atom, (name, arity)) => {
- debug_assert_eq!(arity, 0);
-
- if name != atom!("[]") {
- Some(name)
- } else {
- None
- }
- }
- (HeapCellValueTag::Str, s) => {
- let (name, arity) = cell_as_atom_cell!(self.heap[s])
- .get_name_and_arity();
-
- debug_assert_eq!(arity, 0);
-
- if name != atom!("[]") {
- Some(name)
- } else {
- None
- }
- }
- _ => {
- None
- }
+ (HeapCellValueTag::Atom, (name, arity)) => {
+ debug_assert_eq!(arity, 0);
+
+ if name != atom!("[]") {
+ Some(name)
+ } else {
+ None
+ }
+ }
+ (HeapCellValueTag::Str, s) => {
+ let (name, arity) = cell_as_atom_cell!(self.heap[s])
+ .get_name_and_arity();
+
+ debug_assert_eq!(arity, 0);
+
+ if name != atom!("[]") {
+ Some(name)
+ } else {
+ None
+ }
+ }
+ _ => {
+ None
+ }
);
let eof_action = read_heap_cell!(self.store(MachineState::deref(self, eof_action)),
- (HeapCellValueTag::Atom, (name, arity)) => {
- debug_assert_eq!(arity, 0);
-
- match name {
- atom!("eof_code") => EOFAction::EOFCode,
- atom!("error") => EOFAction::Error,
- atom!("reset") => EOFAction::Reset,
- _ => unreachable!(),
- }
- }
- (HeapCellValueTag::Str, s) => {
- let (name, arity) = cell_as_atom_cell!(self.heap[s])
- .get_name_and_arity();
-
- debug_assert_eq!(arity, 0);
-
- match name {
- atom!("eof_code") => EOFAction::EOFCode,
- atom!("error") => EOFAction::Error,
- atom!("reset") => EOFAction::Reset,
- _ => unreachable!(),
- }
- }
- _ => {
- unreachable!()
- }
+ (HeapCellValueTag::Atom, (name, arity)) => {
+ debug_assert_eq!(arity, 0);
+
+ match name {
+ atom!("eof_code") => EOFAction::EOFCode,
+ atom!("error") => EOFAction::Error,
+ atom!("reset") => EOFAction::Reset,
+ _ => unreachable!(),
+ }
+ }
+ (HeapCellValueTag::Str, s) => {
+ let (name, arity) = cell_as_atom_cell!(self.heap[s])
+ .get_name_and_arity();
+
+ debug_assert_eq!(arity, 0);
+
+ match name {
+ atom!("eof_code") => EOFAction::EOFCode,
+ atom!("error") => EOFAction::Error,
+ atom!("reset") => EOFAction::Reset,
+ _ => unreachable!(),
+ }
+ }
+ _ => {
+ unreachable!()
+ }
);
let reposition = read_heap_cell!(self.store(MachineState::deref(self, reposition)),
- (HeapCellValueTag::Atom, (name, arity)) => {
- debug_assert_eq!(arity, 0);
- name == atom!("true")
- }
- (HeapCellValueTag::Str, s) => {
- let (name, arity) = cell_as_atom_cell!(self.heap[s])
- .get_name_and_arity();
-
- debug_assert_eq!(arity, 0);
- name == atom!("true")
- }
- _ => {
- unreachable!()
- }
+ (HeapCellValueTag::Atom, (name, arity)) => {
+ debug_assert_eq!(arity, 0);
+ name == atom!("true")
+ }
+ (HeapCellValueTag::Str, s) => {
+ let (name, arity) = cell_as_atom_cell!(self.heap[s])
+ .get_name_and_arity();
+
+ debug_assert_eq!(arity, 0);
+ name == atom!("true")
+ }
+ _ => {
+ unreachable!()
+ }
);
let stream_type = read_heap_cell!(self.store(MachineState::deref(self, stream_type)),
- (HeapCellValueTag::Atom, (name, arity)) => {
- debug_assert_eq!(arity, 0);
- match name {
- atom!("text") => StreamType::Text,
- atom!("binary") => StreamType::Binary,
- _ => unreachable!(),
- }
- }
- (HeapCellValueTag::Str, s) => {
- let (name, arity) = cell_as_atom_cell!(self.heap[s])
- .get_name_and_arity();
-
- debug_assert_eq!(arity, 0);
- match name {
- atom!("text") => StreamType::Text,
- atom!("binary") => StreamType::Binary,
- _ => unreachable!(),
- }
- }
- _ => {
- unreachable!()
- }
+ (HeapCellValueTag::Atom, (name, arity)) => {
+ debug_assert_eq!(arity, 0);
+ match name {
+ atom!("text") => StreamType::Text,
+ atom!("binary") => StreamType::Binary,
+ _ => unreachable!(),
+ }
+ }
+ (HeapCellValueTag::Str, s) => {
+ let (name, arity) = cell_as_atom_cell!(self.heap[s])
+ .get_name_and_arity();
+
+ debug_assert_eq!(arity, 0);
+ match name {
+ atom!("text") => StreamType::Text,
+ atom!("binary") => StreamType::Binary,
+ _ => unreachable!(),
+ }
+ }
+ _ => {
+ unreachable!()
+ }
);
let mut options = StreamOptions::default();
let addr = self.store(MachineState::deref(self, addr));
read_heap_cell!(addr,
- (HeapCellValueTag::Atom, (name, arity)) => {
- debug_assert_eq!(arity, 0);
-
- return match stream_aliases.get(&name) {
- Some(stream) if !stream.is_null_stream() => Ok(*stream),
- _ => {
- let stub = functor_stub(caller, arity);
- let addr = atom_as_cell!(name);
-
- let existence_error = self.existence_error(ExistenceError::Stream(addr));
-
- Err(self.error_form(existence_error, stub))
- }
- };
- }
- (HeapCellValueTag::Str, s) => {
- let (name, arity) = cell_as_atom_cell!(self.heap[s])
- .get_name_and_arity();
-
- debug_assert_eq!(arity, 0);
-
- return match stream_aliases.get(&name) {
- Some(stream) if !stream.is_null_stream() => Ok(*stream),
- _ => {
- let stub = functor_stub(caller, arity);
- let addr = atom_as_cell!(name);
-
- let existence_error = self.existence_error(ExistenceError::Stream(addr));
-
- Err(self.error_form(existence_error, stub))
- }
- };
- }
- (HeapCellValueTag::Cons, ptr) => {
- match_untyped_arena_ptr!(ptr,
- (ArenaHeaderTag::Stream, stream) => {
- return if stream.is_null_stream() {
- Err(self.open_permission_error(stream_as_cell!(stream), caller, arity))
- } else {
- Ok(stream)
- };
- }
- (ArenaHeaderTag::Dropped, _value) => {
- let stub = functor_stub(caller, arity);
- let err = self.existence_error(ExistenceError::Stream(addr));
-
- return Err(self.error_form(err, stub));
- }
- _ => {
- }
- );
- }
- _ => {
- }
+ (HeapCellValueTag::Atom, (name, arity)) => {
+ debug_assert_eq!(arity, 0);
+
+ return match stream_aliases.get(&name) {
+ Some(stream) if !stream.is_null_stream() => Ok(*stream),
+ _ => {
+ let stub = functor_stub(caller, arity);
+ let addr = atom_as_cell!(name);
+
+ let existence_error = self.existence_error(ExistenceError::Stream(addr));
+
+ Err(self.error_form(existence_error, stub))
+ }
+ };
+ }
+ (HeapCellValueTag::Str, s) => {
+ let (name, arity) = cell_as_atom_cell!(self.heap[s])
+ .get_name_and_arity();
+
+ debug_assert_eq!(arity, 0);
+
+ return match stream_aliases.get(&name) {
+ Some(stream) if !stream.is_null_stream() => Ok(*stream),
+ _ => {
+ let stub = functor_stub(caller, arity);
+ let addr = atom_as_cell!(name);
+
+ let existence_error = self.existence_error(ExistenceError::Stream(addr));
+
+ Err(self.error_form(existence_error, stub))
+ }
+ };
+ }
+ (HeapCellValueTag::Cons, ptr) => {
+ match_untyped_arena_ptr!(ptr,
+ (ArenaHeaderTag::Stream, stream) => {
+ return if stream.is_null_stream() {
+ Err(self.open_permission_error(stream_as_cell!(stream), caller, arity))
+ } else {
+ Ok(stream)
+ };
+ }
+ (ArenaHeaderTag::Dropped, _value) => {
+ let stub = functor_stub(caller, arity);
+ let err = self.existence_error(ExistenceError::Stream(addr));
+
+ return Err(self.error_form(err, stub));
+ }
+ _ => {
+ }
+ );
+ }
+ _ => {
+ }
);
let stub = functor_stub(caller, arity);