Created
January 8, 2021 15:45
-
-
Save psychon/871644d18d3c61853d0b5cd3e876290d to your computer and use it in GitHub Desktop.
Patch ontop of #254 to make the X11 I/O nonblocking (callback based)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs | |
index 322972ed..44689c1b 100644 | |
--- a/anvil/src/xwayland/mod.rs | |
+++ b/anvil/src/xwayland/mod.rs | |
@@ -10,17 +10,15 @@ use smithay::{ | |
}; | |
use x11rb::{ | |
- connection::Connection as _, | |
- errors::ReplyOrIdError, | |
+ errors::{ConnectionError, ReplyOrIdError}, | |
protocol::{ | |
- composite::{ConnectionExt as _, Redirect}, | |
+ composite::{self, ConnectionExt as _, Redirect}, | |
xproto::{ | |
ChangeWindowAttributesAux, ConfigWindow, ConfigureWindowAux, ConnectionExt as _, EventMask, | |
Window, WindowClass, | |
}, | |
Event, | |
}, | |
- rust_connection::{DefaultStream, RustConnection}, | |
}; | |
use crate::{ | |
@@ -29,9 +27,11 @@ use crate::{ | |
AnvilState, | |
}; | |
-use x11rb_event_source::X11Source; | |
+//use x11rb_event_source::X11Source; | |
+use x11_connection::{CookieExt, X11Connection}; | |
-mod x11rb_event_source; | |
+mod x11_connection; | |
+//mod x11rb_event_source; TODO: Remove the file for the module | |
/// Implementation of [`smithay::xwayland::XWindowManager`] that is used for starting XWayland. | |
/// After XWayland was started, the actual state is kept in `X11State`. | |
@@ -65,10 +65,13 @@ impl XWindowManager for XWm { | |
let wm = Rc::new(RefCell::new(wm)); | |
client.data_map().insert_if_missing(|| Rc::clone(&wm)); | |
self.handle | |
- .insert_source(source, move |events, _, _| { | |
- let mut wm = wm.borrow_mut(); | |
+ .insert_source(source, move |events, conn, _| { | |
+ // FIXME: I couldn't figure out how to pass this through without a raw pointer | |
+ let conn = unsafe { &**conn }; | |
+ // FIXME: This is also quite a bad hack | |
+ wm.borrow_mut().conn = Some(conn as _); | |
for event in events.into_iter() { | |
- wm.handle_event(event, &client)?; | |
+ wm.borrow_mut().handle_event(event?, &client, conn, Rc::clone(&wm))?; | |
} | |
Ok(()) | |
}) | |
@@ -78,21 +81,57 @@ impl XWindowManager for XWm { | |
fn xwayland_exited(&mut self) {} | |
} | |
-x11rb::atom_manager! { | |
- Atoms: AtomsCookie { | |
- WM_S0, | |
- WL_SURFACE_ID, | |
+enum LazyAtom { | |
+ Pending(Vec<Box<dyn for<'a> FnOnce(&'a X11Connection, u32)>>), | |
+ Resolved(u32), | |
+} | |
+ | |
+impl LazyAtom { | |
+ fn new(conn: &X11Connection, name: &[u8]) -> Result<Rc<RefCell<Self>>, ConnectionError> { | |
+ let atom = LazyAtom::Pending(Vec::new()); | |
+ let atom = Rc::new(RefCell::new(atom)); | |
+ let clone = Rc::clone(&atom); | |
+ conn.intern_atom(false, name)? | |
+ .on_reply(conn, move |conn, atom| { | |
+ let atom = atom.unwrap().atom; | |
+ match clone.replace(LazyAtom::Resolved(atom)) { | |
+ LazyAtom::Pending(callbacks) => { | |
+ for cb in callbacks.into_iter() { | |
+ cb(conn, atom) | |
+ } | |
+ } | |
+ LazyAtom::Resolved(_) => unreachable!(), | |
+ } | |
+ }); | |
+ Ok(atom) | |
+ } | |
+ | |
+ fn unwrap(&self) -> u32 { | |
+ match self { | |
+ LazyAtom::Resolved(atom) => *atom, | |
+ _ => unreachable!(), | |
+ } | |
+ } | |
+ | |
+ fn on_reply<C>(inst: &Rc<RefCell<Self>>, conn: &X11Connection, callback: C) | |
+ where | |
+ C: for<'b> FnOnce(&'b X11Connection, u32) + 'static | |
+ { | |
+ match *inst.borrow_mut() { | |
+ Self::Resolved(atom) => callback(conn, atom), | |
+ Self::Pending(ref mut cbs) => cbs.push(Box::new(callback)), | |
+ } | |
} | |
} | |
/// The actual runtime state of the XWayland integration. | |
struct X11State { | |
- conn: Rc<RustConnection>, | |
- atoms: Atoms, | |
log: slog::Logger, | |
- unpaired_surfaces: HashMap<u32, (Window, (i32, i32))>, | |
+ unpaired_surfaces: HashMap<u32, Window>, | |
token: CompositorToken<Roles>, | |
window_map: Rc<RefCell<MyWindowMap>>, | |
+ wl_surface_id: Rc<RefCell<LazyAtom>>, | |
+ conn: Option<*const X11Connection>, | |
} | |
impl X11State { | |
@@ -101,12 +140,10 @@ impl X11State { | |
token: CompositorToken<Roles>, | |
window_map: Rc<RefCell<MyWindowMap>>, | |
log: slog::Logger, | |
- ) -> Result<(Self, X11Source), Box<dyn std::error::Error>> { | |
- // Create an X11 connection. XWayland only uses screen 0. | |
- let screen = 0; | |
- let stream = DefaultStream::from_unix_stream(connection)?; | |
- let conn = RustConnection::connect_to_stream(stream, screen)?; | |
- let atoms = Atoms::new(&conn)?.reply()?; | |
+ ) -> Result<(Self, X11Connection), Box<dyn std::error::Error>> { | |
+ let conn = X11Connection::new(connection)?; | |
+ let wm_s0 = LazyAtom::new(&conn, b"WM_S0")?; | |
+ let wl_surface_id = LazyAtom::new(&conn, b"WL_SURFACE_ID")?; | |
let screen = &conn.setup().roots[0]; | |
@@ -117,7 +154,7 @@ impl X11State { | |
)?; | |
// Tell XWayland that we are the WM by acquiring the WM_S0 selection. No X11 clients are accepted before this. | |
- let win = conn.generate_id()?; | |
+ let win = conn.generate_id(); | |
conn.create_window( | |
screen.root_depth, | |
win, | |
@@ -132,27 +169,30 @@ impl X11State { | |
x11rb::COPY_FROM_PARENT, | |
&Default::default(), | |
)?; | |
- conn.set_selection_owner(win, atoms.WM_S0, x11rb::CURRENT_TIME)?; | |
+ LazyAtom::on_reply(&wm_s0, &conn, move |conn, atom| { | |
+ conn.set_selection_owner(win, atom, x11rb::CURRENT_TIME).unwrap(); | |
+ }); | |
// XWayland wants us to do this to function properly...? | |
- conn.composite_redirect_subwindows(screen.root, Redirect::Manual)?; | |
- | |
- conn.flush()?; | |
+ let root = screen.root; | |
+ conn.on_extension_information(composite::X11_EXTENSION_NAME, move |conn, res| { | |
+ assert_eq!(Ok(()), res, "Xwayland does not support the composite extension?!"); | |
+ conn.composite_redirect_subwindows(root, Redirect::Manual).unwrap(); | |
+ }); | |
- let conn = Rc::new(conn); | |
let wm = Self { | |
- conn: Rc::clone(&conn), | |
- atoms, | |
unpaired_surfaces: Default::default(), | |
token, | |
window_map, | |
log, | |
+ wl_surface_id, | |
+ conn: None, | |
}; | |
- Ok((wm, X11Source::new(conn))) | |
+ Ok((wm, conn)) | |
} | |
- fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> { | |
+ fn handle_event(&mut self, event: Event, client: &Client, conn: &X11Connection, rc: Rc<RefCell<Self>>) -> Result<(), ReplyOrIdError> { | |
debug!(self.log, "X11: Got event {:?}", event); | |
match event { | |
Event::ConfigureRequest(r) => { | |
@@ -179,35 +219,20 @@ impl X11State { | |
if r.value_mask & u16::from(ConfigWindow::BorderWidth) != 0 { | |
aux = aux.border_width(u32::try_from(r.border_width).unwrap()); | |
} | |
- self.conn.configure_window(r.window, &aux)?; | |
+ conn.configure_window(r.window, &aux)?; | |
} | |
Event::MapRequest(r) => { | |
// Just grant the wish | |
- self.conn.map_window(r.window)?; | |
+ conn.map_window(r.window)?; | |
} | |
Event::ClientMessage(msg) => { | |
- if msg.type_ == self.atoms.WL_SURFACE_ID { | |
+ if msg.type_ == self.wl_surface_id.borrow().unwrap() { | |
// We get a WL_SURFACE_ID message when Xwayland creates a WlSurface for a | |
// window. Both the creation of the surface and this client message happen at | |
// roughly the same time and are sent over different sockets (X11 socket and | |
// wayland socket). Thus, we could receive these two in any order. Hence, it | |
// can happen that we get None below when X11 was faster than Wayland. | |
- let location = { | |
- match self.conn.get_geometry(msg.window)?.reply() { | |
- Ok(geo) => (geo.x.into(), geo.y.into()), | |
- Err(err) => { | |
- error!( | |
- self.log, | |
- "Failed to get geometry for {:x}, perhaps the window was already destroyed?", | |
- msg.window; | |
- "err" => format!("{:?}", err), | |
- ); | |
- (0, 0) | |
- } | |
- } | |
- }; | |
- | |
let id = msg.data.as_data32()[0]; | |
let surface = client.get_resource::<WlSurface>(id); | |
info!( | |
@@ -216,9 +241,9 @@ impl X11State { | |
); | |
match surface { | |
None => { | |
- self.unpaired_surfaces.insert(id, (msg.window, location)); | |
+ self.unpaired_surfaces.insert(id, msg.window); | |
} | |
- Some(surface) => self.new_window(msg.window, surface, location), | |
+ Some(surface) => self.new_window(msg.window, surface, Rc::clone(&rc)), | |
} | |
} | |
} | |
@@ -227,19 +252,40 @@ impl X11State { | |
Ok(()) | |
} | |
- fn new_window(&mut self, window: Window, surface: WlSurface, location: (i32, i32)) { | |
+ fn new_window(&mut self, window: Window, surface: WlSurface, clone: Rc<RefCell<Self>>) { | |
debug!(self.log, "Matched X11 surface {:x?} to {:x?}", window, surface); | |
- if self.token.give_role_with(&surface, X11SurfaceRole).is_err() { | |
- // It makes no sense to post a protocol error here since that would only kill Xwayland | |
- error!(self.log, "Surface {:x?} already has a role?!", surface); | |
- return; | |
- } | |
+ let conn = self.conn.unwrap(); | |
+ let conn = unsafe { &*conn }; | |
+ conn.get_geometry(window) | |
+ .unwrap() | |
+ .on_reply(conn, move |_, reply| { | |
+ let wm = clone.borrow_mut(); | |
+ | |
+ let location = match reply { | |
+ Ok(geo) => (geo.x.into(), geo.y.into()), | |
+ Err(err) => { | |
+ error!( | |
+ wm.log, | |
+ "Failed to get geometry for {:x}, perhaps the window was already destroyed?", | |
+ window; | |
+ "err" => format!("{:?}", err), | |
+ ); | |
+ (0, 0) | |
+ } | |
+ }; | |
+ | |
+ if wm.token.give_role_with(&surface, X11SurfaceRole).is_err() { | |
+ // It makes no sense to post a protocol error here since that would only kill Xwayland | |
+ error!(wm.log, "Surface {:x?} already has a role?!", surface); | |
+ return; | |
+ } | |
- let x11surface = X11Surface { surface }; | |
- self.window_map | |
- .borrow_mut() | |
- .insert(Kind::X11(x11surface), location); | |
+ let x11surface = X11Surface { surface }; | |
+ wm.window_map | |
+ .borrow_mut() | |
+ .insert(Kind::X11(x11surface), location); | |
+ }); | |
} | |
} | |
@@ -248,11 +294,12 @@ pub fn commit_hook(surface: &WlSurface) { | |
// Is this the Xwayland client? | |
if let Some(client) = surface.as_ref().client() { | |
if let Some(x11) = client.data_map().get::<Rc<RefCell<X11State>>>() { | |
+ let clone = Rc::clone(&x11); | |
let mut inner = x11.borrow_mut(); | |
// Is the surface among the unpaired surfaces (see comment next to WL_SURFACE_ID | |
// handling above) | |
- if let Some((window, location)) = inner.unpaired_surfaces.remove(&surface.as_ref().id()) { | |
- inner.new_window(window, surface.clone(), location); | |
+ if let Some(window) = inner.unpaired_surfaces.remove(&surface.as_ref().id()) { | |
+ inner.new_window(window, surface.clone(), clone); | |
} | |
} | |
} | |
diff --git a/anvil/src/xwayland/x11_connection.rs b/anvil/src/xwayland/x11_connection.rs | |
new file mode 100644 | |
index 00000000..2a86a257 | |
--- /dev/null | |
+++ b/anvil/src/xwayland/x11_connection.rs | |
@@ -0,0 +1,606 @@ | |
+use std::{ | |
+ cell::{Cell, RefCell}, | |
+ collections::{hash_map::{Entry, HashMap}, VecDeque}, | |
+ convert::{TryFrom, TryInto}, | |
+ io::{Error as IOError, ErrorKind, IoSlice, Read, Result as IOResult, Write}, | |
+ os::unix::net::UnixStream, | |
+ rc::Rc, | |
+}; | |
+ | |
+use smithay::reexports::calloop::{ | |
+ generic::Generic, | |
+ EventSource, Interest, Mode, Poll, Readiness, Token, | |
+}; | |
+ | |
+use x11rb::{ | |
+ connection::{DiscardMode, SequenceNumber, ReplyOrError, RequestConnection, RequestKind}, | |
+ cookie::{Cookie, CookieWithFds, VoidCookie}, | |
+ errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError}, | |
+ protocol::{ | |
+ Event, | |
+ xproto::{ | |
+ ConnectionExt as _, | |
+ Setup, | |
+ SetupAuthenticate, | |
+ SetupFailed, | |
+ SetupRequest, | |
+ GE_GENERIC_EVENT, | |
+ KEYMAP_NOTIFY_EVENT, | |
+ }, | |
+ }, | |
+ utils::RawFdContainer, | |
+ x11_utils::{ExtensionInformation, ExtInfoProvider, Serialize, TryParse, X11Error}, | |
+}; | |
+ | |
+pub type BufWithFds = x11rb::connection::BufWithFds<Vec<u8>>; | |
+ | |
+pub trait CookieExt<R: for<'a> TryFrom<&'a [u8], Error = ParseError>> { | |
+ fn on_reply<C>(self, conn: &X11Connection, callback: C) | |
+ where | |
+ C: for<'b> FnOnce(&'b X11Connection, Result<R, ReplyError>) + 'static; | |
+} | |
+ | |
+impl<R: for<'a> TryFrom<&'a [u8], Error = ParseError>, Conn: RequestConnection + ?Sized> CookieExt<R> for Cookie<'_, Conn, R> { | |
+ fn on_reply<C>(self, conn: &X11Connection, callback: C) | |
+ where | |
+ R: for<'a> TryFrom<&'a [u8], Error = ParseError>, | |
+ C: for<'b> FnOnce(&'b X11Connection, Result<R, ReplyError>) + 'static | |
+ { | |
+ let sequence = self.sequence_number(); | |
+ std::mem::forget(self); | |
+ conn.on_reply(sequence, Box::new(|conn, reply| { | |
+ match reply { | |
+ Ok(reply) => { | |
+ match R::try_from(&reply[..]) { | |
+ Ok(reply) => callback(conn, Ok(reply)), | |
+ Err(err) => callback(conn, Err(ReplyError::ConnectionError(err.into()))) | |
+ } | |
+ } | |
+ Err(err) => callback(conn, Err(err.into())) | |
+ } | |
+ })); | |
+ } | |
+} | |
+ | |
+pub struct X11Connection { | |
+ stream: Generic<UnixStream>, | |
+ setup: Setup, | |
+ next_id: Cell<u32>, | |
+ sequence_write: Cell<SequenceNumber>, | |
+ sequence_need_sync: Cell<SequenceNumber>, | |
+ sequence_read: Cell<SequenceNumber>, | |
+ read_buffer: RefCell<Vec<u8>>, | |
+ write_buffer: RefCell<Vec<u8>>, | |
+ extension_manager: Rc<RefCell<ExtensionManager>>, | |
+ replies: RefCell<VecDeque<(SequenceNumber, Box<dyn for<'b> FnOnce(&'b X11Connection, Result<Vec<u8>, ReplyError>)>)>>, | |
+} | |
+ | |
+impl X11Connection { | |
+ pub fn new(mut stream: UnixStream) -> Result<Self, ConnectError> { | |
+ // This does blocking I/O because the extra complexity for doing this non-blockingly is not | |
+ // worth it: Xwayland just started and we are the only X11 client. This should be quick. | |
+ stream.set_nonblocking(false)?; | |
+ let setup = setup(&mut stream)?; | |
+ stream.set_nonblocking(true)?; | |
+ let stream = Generic::new(stream, Interest::Both, Mode::Edge); | |
+ Ok(Self { | |
+ stream, | |
+ setup, | |
+ next_id: Cell::new(0), | |
+ sequence_write: Cell::new(0), | |
+ sequence_need_sync: Cell::new(u16::max_value().into()), | |
+ sequence_read: Cell::new(0), | |
+ read_buffer: RefCell::new(Vec::new()), | |
+ write_buffer: RefCell::new(Vec::new()), | |
+ extension_manager: Rc::new(RefCell::new(Default::default())), | |
+ replies: RefCell::new(Default::default()), | |
+ }) | |
+ } | |
+ | |
+ pub fn setup(&self) -> &Setup { | |
+ &self.setup | |
+ } | |
+ | |
+ pub fn generate_id(&self) -> u32 { | |
+ let (id_base, id_mask) = (self.setup.resource_id_base, self.setup.resource_id_mask); | |
+ let id = self.next_id.get(); | |
+ | |
+ // Find the right-most set bit in the mask | |
+ let increment = id_mask & (1 + !id_mask); | |
+ self.next_id.set(self.next_id.get() + increment); | |
+ | |
+ let result = id | id_base; | |
+ assert!(result <= id_base | id_mask, "Exhausted available XIDs and no one implemented XC-MISC yet"); | |
+ result | |
+ } | |
+ | |
+ fn on_reply(&self, sequence: SequenceNumber, callback: Box<dyn for<'b> FnOnce(&'b X11Connection, Result<Vec<u8>, ReplyError>)>) { | |
+ self.replies.borrow_mut().push_back((sequence, callback)); | |
+ } | |
+ | |
+ fn write_request(&self, request: &[IoSlice<'_>], has_reply: bool) -> Result<SequenceNumber, ConnectionError> { | |
+ if !has_reply && self.sequence_write == self.sequence_need_sync { | |
+ self.get_input_focus()?; | |
+ } | |
+ | |
+ let sequence = { | |
+ let sequence_write = self.sequence_write.get(); | |
+ self.sequence_write.set(sequence_write + 1); | |
+ sequence_write + 1 | |
+ }; | |
+ | |
+ if has_reply { | |
+ self.sequence_need_sync.set(sequence + SequenceNumber::from(u16::max_value())); | |
+ } | |
+ | |
+ { | |
+ let mut write_buffer = self.write_buffer.borrow_mut(); | |
+ for part in request { | |
+ write_buffer.extend(&**part); | |
+ } | |
+ } | |
+ self.flush()?; | |
+ | |
+ Ok(sequence) | |
+ } | |
+ | |
+ // Try to flush the write buffer | |
+ fn flush(&self) -> IOResult<()> { | |
+ let mut write_buffer = self.write_buffer.borrow_mut(); | |
+ if write_buffer.is_empty() { | |
+ // The stream is writable, but we have nothing to write. Since we are edge-triggered, | |
+ // calloop will stop informing us about writability. | |
+ return Ok(()); | |
+ } | |
+ loop { | |
+ match (&self.stream.file).write(&write_buffer[..]) { | |
+ Ok(0) => return Err(IOError::new(ErrorKind::WriteZero, "failed to write the buffered data")), | |
+ Ok(n) => { | |
+ let _ = write_buffer.drain(..n); | |
+ if write_buffer.is_empty() { | |
+ return Ok(()); | |
+ } | |
+ // Try again in another loop iteration | |
+ }, | |
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => return Ok(()), | |
+ Err(e) => return Err(e), | |
+ } | |
+ } | |
+ } | |
+ | |
+ // Read from the connection and then call the callback with the found packets | |
+ fn read<C>(&mut self, mut callback: C) -> Result<(), ReplyOrIdError> | |
+ where | |
+ C: FnMut(Vec<Result<Event, ParseError>>, &mut *const X11Connection) -> Result<(), ReplyOrIdError> | |
+ { | |
+ self.do_read().map_err(|err| ReplyOrIdError::ConnectionError(err.into()))?; | |
+ let packets = self.read_packets(); | |
+ let mut events = Vec::new(); | |
+ for packet in packets.into_iter() { | |
+ // Everything has a sequence number. With one exception. :-( | |
+ let sequence = if packet[0] == KEYMAP_NOTIFY_EVENT { | |
+ self.sequence_read.get() | |
+ } else { | |
+ // On the wire, the sequence number has 16 bits. That can wrap. Expand this to 64 | |
+ // bits that cannot possible wrap. | |
+ let sequence = u16::from_ne_bytes([packet[2], packet[3]]); | |
+ let high_bytes = self.sequence_read.get() & !SequenceNumber::from(u16::max_value()); | |
+ let mut full_number = SequenceNumber::from(sequence) | high_bytes; | |
+ if full_number < self.sequence_read.get() { | |
+ full_number += SequenceNumber::from(u16::max_value()) + 1; | |
+ } | |
+ full_number | |
+ }; | |
+ self.sequence_read.set(sequence); | |
+ | |
+ let next_sync = sequence + SequenceNumber::from(u16::max_value()); | |
+ if self.sequence_need_sync.get() < next_sync { | |
+ self.sequence_need_sync.set(next_sync); | |
+ } | |
+ | |
+ let mut replies = self.replies.borrow_mut(); | |
+ let callback = if let Some((seqno, _)) = replies.front() { | |
+ if *seqno == sequence { | |
+ Some(replies.pop_front().unwrap().1) | |
+ } else { | |
+ assert!(*seqno > sequence); | |
+ None | |
+ } | |
+ } else { | |
+ None | |
+ }; | |
+ | |
+ if let Some(callback) = callback { | |
+ let reply = if packet[0] == 0 { | |
+ // This is an X11 error | |
+ match self.parse_error(&packet[..]) { | |
+ Ok(error) => Err(ReplyError::X11Error(error)), | |
+ Err(error) => Err(ReplyError::ConnectionError(error.into())), | |
+ } | |
+ } else { | |
+ // This must be a reply since we have a callback | |
+ assert_eq!(1, packet[0]); | |
+ Ok(packet) | |
+ }; | |
+ callback(self, reply); | |
+ } else { | |
+ events.push(self.parse_event(&packet[..])) | |
+ } | |
+ } | |
+ callback(events, &mut (self as _)) | |
+ } | |
+ | |
+ // Read as many bytes as possible from the stream into the read_buffer | |
+ fn do_read(&mut self) -> IOResult<()> { | |
+ let mut read_buffer = self.read_buffer.borrow_mut(); | |
+ let mut buffer = BufferSize::new(&mut read_buffer); | |
+ | |
+ loop { | |
+ buffer.0.resize(buffer.1 + 4096, 0); | |
+ match self.stream.file.read(&mut buffer.0[buffer.1..]) { | |
+ Ok(0) => return Err(IOError::new(ErrorKind::UnexpectedEof, "Connection closed")), | |
+ Ok(n) => { | |
+ buffer.1 += n; | |
+ // Try again in another loop iteration | |
+ }, | |
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => return Ok(()), | |
+ Err(e) => return Err(e), | |
+ } | |
+ }; | |
+ | |
+ // Reset the read buffer to the length that was actually read on error | |
+ struct BufferSize<'a>(&'a mut Vec<u8>, usize); | |
+ | |
+ impl<'a> BufferSize<'a> { | |
+ fn new(vec: &'a mut Vec<u8>) -> Self { | |
+ let len = vec.len(); | |
+ Self(vec, len) | |
+ } | |
+ } | |
+ | |
+ impl Drop for BufferSize<'_> { | |
+ fn drop(&mut self) { | |
+ self.0.truncate(self.1); | |
+ } | |
+ } | |
+ } | |
+ | |
+ // Read X11 packets from the connection. | |
+ fn read_packets(&mut self) -> Vec<Vec<u8>> { | |
+ const REPLY: u8 = 1; | |
+ let minimal_packet_length = 32; | |
+ let mut result = Vec::new(); | |
+ | |
+ let mut offset = 0; | |
+ | |
+ let mut read_buffer = self.read_buffer.borrow_mut(); | |
+ while read_buffer.len() - offset >= minimal_packet_length { | |
+ let packet = &read_buffer[offset..]; | |
+ | |
+ let response_type = packet[0]; | |
+ let extra_length = if response_type == REPLY || response_type & 0x7f == GE_GENERIC_EVENT { | |
+ let length_field = packet[4..8].try_into().unwrap(); | |
+ let length_field = u32::from_ne_bytes(length_field) as usize; | |
+ 4 * length_field | |
+ } else { | |
+ 0 | |
+ }; | |
+ let full_packet_length = minimal_packet_length + extra_length; | |
+ | |
+ if read_buffer.len() - offset < full_packet_length { | |
+ break; | |
+ } | |
+ | |
+ result.push(packet[..full_packet_length].to_vec()); | |
+ offset += full_packet_length; | |
+ } | |
+ let _ = read_buffer.drain(..offset); | |
+ | |
+ result | |
+ } | |
+ | |
+ pub fn on_extension_information<C>(&self, extension_name: &'static str, callback: C) | |
+ where | |
+ C: for<'a> FnOnce(&'a X11Connection, Result<(), ()>) + 'static | |
+ { | |
+ let clone = Rc::clone(&self.extension_manager); | |
+ self.extension_manager.borrow_mut().on_extension_information(self, extension_name, clone, Box::new(callback)) | |
+ } | |
+} | |
+ | |
+impl RequestConnection for X11Connection { | |
+ type Buf = Vec<u8>; | |
+ | |
+ fn send_request_with_reply<R>( | |
+ &self, | |
+ bufs: &[IoSlice<'_>], | |
+ fds: Vec<RawFdContainer>, | |
+ ) -> Result<Cookie<'_, Self, R>, ConnectionError> | |
+ where | |
+ R: for<'a> TryFrom<&'a [u8], Error = ParseError> | |
+ { | |
+ assert!(fds.is_empty()); | |
+ let sequence = self.write_request(bufs, true)?; | |
+ Ok(Cookie::new(&self, sequence)) | |
+ } | |
+ | |
+ fn send_request_with_reply_with_fds<R>( | |
+ &self, | |
+ _bufs: &[IoSlice<'_>], | |
+ _fds: Vec<RawFdContainer>, | |
+ ) -> Result<CookieWithFds<'_, Self, R>, ConnectionError> | |
+ where | |
+ R: for<'a> TryFrom<(&'a [u8], Vec<RawFdContainer>), Error = ParseError> | |
+ { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn send_request_without_reply( | |
+ &self, | |
+ bufs: &[IoSlice<'_>], | |
+ fds: Vec<RawFdContainer>, | |
+ ) -> Result<VoidCookie<'_, Self>, ConnectionError> | |
+ { | |
+ assert!(fds.is_empty()); | |
+ let sequence = self.write_request(bufs, false)?; | |
+ Ok(VoidCookie::new(&self, sequence)) | |
+ } | |
+ | |
+ fn discard_reply(&self, sequence: SequenceNumber, kind: RequestKind, mode: DiscardMode) { | |
+ let _ = (sequence, kind, mode); | |
+ //todo!() | |
+ eprintln!("Ignoring discard_reply() call, this should be fixed eventually, but for now does not cause problems"); | |
+ } | |
+ | |
+ fn prefetch_extension_information( | |
+ &self, | |
+ extension_name: &'static str, | |
+ ) -> Result<(), ConnectionError> { | |
+ self.on_extension_information(extension_name, |_, _| {}); | |
+ Ok(()) | |
+ } | |
+ | |
+ fn extension_information( | |
+ &self, | |
+ extension_name: &'static str, | |
+ ) -> Result<Option<ExtensionInformation>, ConnectionError> { | |
+ self.extension_manager.borrow_mut().extension_information(extension_name) | |
+ } | |
+ | |
+ fn wait_for_reply_or_raw_error( | |
+ &self, | |
+ _sequence: SequenceNumber, | |
+ ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn wait_for_reply( | |
+ &self, | |
+ _sequence: SequenceNumber, | |
+ ) -> Result<Option<Vec<u8>>, ConnectionError> { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn wait_for_reply_with_fds_raw( | |
+ &self, | |
+ _sequence: SequenceNumber, | |
+ ) -> Result<ReplyOrError<BufWithFds, Vec<u8>>, ConnectionError> { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn check_for_raw_error( | |
+ &self, | |
+ _sequence: SequenceNumber, | |
+ ) -> Result<Option<Vec<u8>>, ConnectionError> { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn prefetch_maximum_request_bytes(&self) { | |
+ unimplemented!() | |
+ } | |
+ | |
+ fn maximum_request_bytes(&self) -> usize { | |
+ 4 * usize::from(self.setup.maximum_request_length) | |
+ } | |
+ | |
+ fn parse_error(&self, error: &[u8]) -> Result<X11Error, ParseError> { | |
+ X11Error::try_parse(error, &*self.extension_manager.borrow_mut()) | |
+ } | |
+ | |
+ fn parse_event(&self, event: &[u8]) -> Result<Event, ParseError> { | |
+ Event::parse(event, &*self.extension_manager.borrow_mut()) | |
+ } | |
+} | |
+ | |
+impl EventSource for X11Connection { | |
+ type Event = Vec<Result<Event, ParseError>>; | |
+ type Metadata = *const X11Connection; | |
+ type Ret = Result<(), ReplyOrIdError>; | |
+ | |
+ fn process_events<C>(&mut self, readiness: Readiness, _token: Token, callback: C) -> IOResult<()> | |
+ where | |
+ C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, | |
+ { | |
+ if readiness.writable { | |
+ self.flush()?; | |
+ } | |
+ if readiness.readable { | |
+ self.read(callback).map_err(|err| match err { | |
+ ReplyOrIdError::ConnectionError(ConnectionError::IOError(err)) => err, | |
+ err => IOError::new(ErrorKind::Other, err) | |
+ })?; | |
+ } | |
+ Ok(()) | |
+ } | |
+ | |
+ fn register(&mut self, poll: &mut Poll, token: Token) -> IOResult<()> { | |
+ self.stream.register(poll, token) | |
+ } | |
+ | |
+ fn reregister(&mut self, poll: &mut Poll, token: Token) -> IOResult<()> { | |
+ self.stream.reregister(poll, token) | |
+ } | |
+ | |
+ fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> { | |
+ self.stream.unregister(poll) | |
+ } | |
+} | |
+ | |
+fn setup(stream: &mut UnixStream) -> Result<Setup, ConnectError> { | |
+ // Send a SetupRequest | |
+ let data = SetupRequest { | |
+ byte_order: byte_order(), | |
+ protocol_major_version: 11, | |
+ protocol_minor_version: 0, | |
+ authorization_protocol_name: Vec::new(), | |
+ authorization_protocol_data: Vec::new(), | |
+ }.serialize(); | |
+ stream.write_all(&data)?; | |
+ | |
+ // Get the Setup (known to be at least 8 bytes long) | |
+ let mut setup = vec![0; 8]; | |
+ stream.read_exact(&mut setup[..])?; | |
+ // Now we know the length of the full Setup and can read it | |
+ let extra_length = usize::from(u16::from_ne_bytes([setup[6], setup[7]])) * 4; | |
+ setup.reserve_exact(extra_length); | |
+ setup.resize(8 + extra_length, 0); | |
+ stream.read_exact(&mut setup[8..])?; | |
+ match setup[0] { | |
+ // Success! | |
+ 1 => Ok(Setup::try_parse(&setup[..])?.0), | |
+ // The other cases are various errors | |
+ 0 => { | |
+ let failed = SetupFailed::try_parse(&setup[..])?.0; | |
+ Err(ConnectError::SetupFailed(failed)) | |
+ } | |
+ 2 => { | |
+ let failed = SetupAuthenticate::try_parse(&setup[..])?.0; | |
+ Err(ConnectError::SetupAuthenticate(failed)) | |
+ } | |
+ _ => { Err(ParseError::InvalidValue.into()) } | |
+ } | |
+} | |
+ | |
+#[cfg(target_endian = "little")] | |
+fn byte_order() -> u8 { | |
+ 0x6c | |
+} | |
+ | |
+#[cfg(target_endian = "big")] | |
+fn byte_order() -> u8 { | |
+ 0x42 | |
+} | |
+ | |
+#[derive(Default)] | |
+struct ExtensionManager(HashMap<&'static str, ExtState>); | |
+ | |
+pub enum ExtState { | |
+ Requested(Vec<Box<dyn for<'a> FnOnce(&'a X11Connection, Result<(), ()>)>>), | |
+ Present(ExtensionInformation), | |
+ Missing, | |
+} | |
+ | |
+impl ExtensionManager { | |
+ fn on_extension_information<C>(&mut self, conn: &X11Connection, extension_name: &'static str, self_clone: Rc<RefCell<ExtensionManager>>, callback: C) | |
+ where | |
+ C: for<'a> FnOnce(&X11Connection, Result<(), ()>) + 'static | |
+ { | |
+ match self.0.entry(extension_name) { | |
+ Entry::Occupied(entry) => match entry.into_mut() { | |
+ ExtState::Requested(cbs) => cbs.push(Box::new(callback)), | |
+ ExtState::Present(_) => callback(conn, Ok(())), | |
+ ExtState::Missing => callback(conn, Err(())), | |
+ }, | |
+ Entry::Vacant(entry) => { | |
+ conn.query_extension(extension_name.as_bytes()) | |
+ .unwrap() | |
+ .on_reply(conn, move |conn, reply| { | |
+ let info = reply.unwrap(); | |
+ let (result, arg) = if info.present { | |
+ let info = ExtensionInformation { | |
+ major_opcode: info.major_opcode, | |
+ first_event: info.first_event, | |
+ first_error: info.first_error, | |
+ }; | |
+ (ExtState::Present(info), Ok(())) | |
+ } else { | |
+ (ExtState::Missing, Err(())) | |
+ }; | |
+ let old = self_clone.borrow_mut().0.insert(extension_name, result); | |
+ match old { | |
+ Some(ExtState::Requested(cbs)) => { | |
+ for callback in cbs.into_iter() { | |
+ callback(conn, arg); | |
+ } | |
+ } | |
+ _ => unreachable!(), | |
+ } | |
+ }); | |
+ entry.insert(ExtState::Requested(vec![Box::new(callback)])); | |
+ } | |
+ } | |
+ } | |
+ | |
+ fn extension_information( | |
+ &self, | |
+ extension_name: &'static str, | |
+ ) -> Result<Option<ExtensionInformation>, ConnectionError> { | |
+ match self.0.get(extension_name) { | |
+ Some(state) => match state { | |
+ ExtState::Requested(_) => panic!("Extension information not (yet?) available"), | |
+ ExtState::Present(info) => Ok(Some(*info)), | |
+ ExtState::Missing => Ok(None), | |
+ } | |
+ None => panic!("Extension information not (yet?) available") | |
+ } | |
+ } | |
+} | |
+ | |
+impl ExtInfoProvider for ExtensionManager { | |
+ fn get_from_major_opcode(&self, major_opcode: u8) -> Option<(&str, ExtensionInformation)> { | |
+ self.0 | |
+ .iter() | |
+ .filter_map(|(name, state)| { | |
+ if let ExtState::Present(info) = state { | |
+ Some((*name, *info)) | |
+ } else { | |
+ None | |
+ } | |
+ }) | |
+ .find(|(_, info)| info.major_opcode == major_opcode) | |
+ } | |
+ | |
+ fn get_from_event_code(&self, event_code: u8) -> Option<(&str, ExtensionInformation)> { | |
+ self.0 | |
+ .iter() | |
+ .filter_map(|(name, state)| { | |
+ if let ExtState::Present(info) = state { | |
+ if info.first_event <= event_code { | |
+ Some((*name, *info)) | |
+ } else { | |
+ None | |
+ } | |
+ } else { | |
+ None | |
+ } | |
+ }) | |
+ .max_by_key(|(_, info)| info.first_event) | |
+ } | |
+ | |
+ fn get_from_error_code(&self, error_code: u8) -> Option<(&str, ExtensionInformation)> { | |
+ self.0 | |
+ .iter() | |
+ .filter_map(|(name, state)| { | |
+ if let ExtState::Present(info) = state { | |
+ if info.first_error <= error_code { | |
+ Some((*name, *info)) | |
+ } else { | |
+ None | |
+ } | |
+ } else { | |
+ None | |
+ } | |
+ }) | |
+ .max_by_key(|(_, info)| info.first_error) | |
+ } | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment