1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
use std::cmp; use std::io::{self, Read, Write, Stdout}; use std::thread; use bytes::BytesMut; use futures::{Future, Stream, Sink, Poll, Async}; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; pub struct Stdin { rx_stdin: mpsc::UnboundedReceiver<io::Result<Vec<u8>>>, buffer: BytesMut, eof: bool, } impl Read for Stdin { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { if self.eof { return Ok(0); } if self.buffer.len() > 0 { let len = cmp::min(self.buffer.len(), buf.len()); buf[0..len].copy_from_slice(&self.buffer[0..len]); self.buffer.split_to(len); return Ok(len); } debug_assert_eq!(self.buffer.len(), 0); match self.rx_stdin.poll() { Ok(Async::Ready(Some(Ok(bytes)))) => { let len = cmp::min(bytes.len(), buf.len()); buf[0..len].copy_from_slice(&bytes[0..len]); if len < bytes.len() { self.buffer.extend_from_slice(&bytes[len..]); } Ok(len) } Ok(Async::Ready(Some(Err(err)))) => Err(err), Ok(Async::Ready(None)) => { self.eof = true; Ok(0) } Ok(Async::NotReady) => Err(io::Error::new(io::ErrorKind::WouldBlock, "Not ready")), Err(()) => Err(io::Error::new(io::ErrorKind::Other, "broken pipe")), } } } impl AsyncRead for Stdin {} pub fn stdin(chunk_size: usize) -> Stdin { assert!(chunk_size > 0); let (mut tx_stdin, rx_stdin) = mpsc::unbounded(); thread::spawn(move || { let stdin = io::stdin(); let mut locked_stdin = stdin.lock(); loop { let mut bytes = vec![0u8; chunk_size]; match locked_stdin.read(&mut bytes) { Ok(n_bytes) => { bytes.truncate(n_bytes); match tx_stdin.send(Ok(bytes)).wait() { Ok(t) => tx_stdin = t, Err(_) => break, } } Err(err) => { let _ = tx_stdin.send(Err(err)).wait(); break; } } } }); Stdin { rx_stdin, buffer: BytesMut::new(), eof: false, } } pub struct StdioStream { stdin: Stdin, stdout: Stdout, } impl StdioStream { pub fn new(chunk_size: usize) -> Self { let stdin = stdin(chunk_size); let stdout = io::stdout(); StdioStream { stdin, stdout } } } impl Read for StdioStream { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.stdin.read(buf) } } impl AsyncRead for StdioStream {} impl Write for StdioStream { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.stdout.write(buf) } fn flush(&mut self) -> io::Result<()> { self.stdout.flush() } } impl AsyncWrite for StdioStream { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(Async::Ready(())) } }