Skip to content

Instantly share code, notes, and snippets.

@psychon
Created January 8, 2021 15:45
Show Gist options
  • Save psychon/871644d18d3c61853d0b5cd3e876290d to your computer and use it in GitHub Desktop.
Save psychon/871644d18d3c61853d0b5cd3e876290d to your computer and use it in GitHub Desktop.
Patch ontop of #254 to make the X11 I/O nonblocking (callback based)
diff --git a/anvil/src/xwayland/mod.rs b/anvil/src/xwayland/mod.rs
index 322972ed..44689c1b 100644
--- a/anvil/src/xwayland/mod.rs
+++ b/anvil/src/xwayland/mod.rs
@@ -10,17 +10,15 @@ use smithay::{
};
use x11rb::{
- connection::Connection as _,
- errors::ReplyOrIdError,
+ errors::{ConnectionError, ReplyOrIdError},
protocol::{
- composite::{ConnectionExt as _, Redirect},
+ composite::{self, ConnectionExt as _, Redirect},
xproto::{
ChangeWindowAttributesAux, ConfigWindow, ConfigureWindowAux, ConnectionExt as _, EventMask,
Window, WindowClass,
},
Event,
},
- rust_connection::{DefaultStream, RustConnection},
};
use crate::{
@@ -29,9 +27,11 @@ use crate::{
AnvilState,
};
-use x11rb_event_source::X11Source;
+//use x11rb_event_source::X11Source;
+use x11_connection::{CookieExt, X11Connection};
-mod x11rb_event_source;
+mod x11_connection;
+//mod x11rb_event_source; TODO: Remove the file for the module
/// Implementation of [`smithay::xwayland::XWindowManager`] that is used for starting XWayland.
/// After XWayland was started, the actual state is kept in `X11State`.
@@ -65,10 +65,13 @@ impl XWindowManager for XWm {
let wm = Rc::new(RefCell::new(wm));
client.data_map().insert_if_missing(|| Rc::clone(&wm));
self.handle
- .insert_source(source, move |events, _, _| {
- let mut wm = wm.borrow_mut();
+ .insert_source(source, move |events, conn, _| {
+ // FIXME: I couldn't figure out how to pass this through without a raw pointer
+ let conn = unsafe { &**conn };
+ // FIXME: This is also quite a bad hack
+ wm.borrow_mut().conn = Some(conn as _);
for event in events.into_iter() {
- wm.handle_event(event, &client)?;
+ wm.borrow_mut().handle_event(event?, &client, conn, Rc::clone(&wm))?;
}
Ok(())
})
@@ -78,21 +81,57 @@ impl XWindowManager for XWm {
fn xwayland_exited(&mut self) {}
}
-x11rb::atom_manager! {
- Atoms: AtomsCookie {
- WM_S0,
- WL_SURFACE_ID,
+enum LazyAtom {
+ Pending(Vec<Box<dyn for<'a> FnOnce(&'a X11Connection, u32)>>),
+ Resolved(u32),
+}
+
+impl LazyAtom {
+ fn new(conn: &X11Connection, name: &[u8]) -> Result<Rc<RefCell<Self>>, ConnectionError> {
+ let atom = LazyAtom::Pending(Vec::new());
+ let atom = Rc::new(RefCell::new(atom));
+ let clone = Rc::clone(&atom);
+ conn.intern_atom(false, name)?
+ .on_reply(conn, move |conn, atom| {
+ let atom = atom.unwrap().atom;
+ match clone.replace(LazyAtom::Resolved(atom)) {
+ LazyAtom::Pending(callbacks) => {
+ for cb in callbacks.into_iter() {
+ cb(conn, atom)
+ }
+ }
+ LazyAtom::Resolved(_) => unreachable!(),
+ }
+ });
+ Ok(atom)
+ }
+
+ fn unwrap(&self) -> u32 {
+ match self {
+ LazyAtom::Resolved(atom) => *atom,
+ _ => unreachable!(),
+ }
+ }
+
+ fn on_reply<C>(inst: &Rc<RefCell<Self>>, conn: &X11Connection, callback: C)
+ where
+ C: for<'b> FnOnce(&'b X11Connection, u32) + 'static
+ {
+ match *inst.borrow_mut() {
+ Self::Resolved(atom) => callback(conn, atom),
+ Self::Pending(ref mut cbs) => cbs.push(Box::new(callback)),
+ }
}
}
/// The actual runtime state of the XWayland integration.
struct X11State {
- conn: Rc<RustConnection>,
- atoms: Atoms,
log: slog::Logger,
- unpaired_surfaces: HashMap<u32, (Window, (i32, i32))>,
+ unpaired_surfaces: HashMap<u32, Window>,
token: CompositorToken<Roles>,
window_map: Rc<RefCell<MyWindowMap>>,
+ wl_surface_id: Rc<RefCell<LazyAtom>>,
+ conn: Option<*const X11Connection>,
}
impl X11State {
@@ -101,12 +140,10 @@ impl X11State {
token: CompositorToken<Roles>,
window_map: Rc<RefCell<MyWindowMap>>,
log: slog::Logger,
- ) -> Result<(Self, X11Source), Box<dyn std::error::Error>> {
- // Create an X11 connection. XWayland only uses screen 0.
- let screen = 0;
- let stream = DefaultStream::from_unix_stream(connection)?;
- let conn = RustConnection::connect_to_stream(stream, screen)?;
- let atoms = Atoms::new(&conn)?.reply()?;
+ ) -> Result<(Self, X11Connection), Box<dyn std::error::Error>> {
+ let conn = X11Connection::new(connection)?;
+ let wm_s0 = LazyAtom::new(&conn, b"WM_S0")?;
+ let wl_surface_id = LazyAtom::new(&conn, b"WL_SURFACE_ID")?;
let screen = &conn.setup().roots[0];
@@ -117,7 +154,7 @@ impl X11State {
)?;
// Tell XWayland that we are the WM by acquiring the WM_S0 selection. No X11 clients are accepted before this.
- let win = conn.generate_id()?;
+ let win = conn.generate_id();
conn.create_window(
screen.root_depth,
win,
@@ -132,27 +169,30 @@ impl X11State {
x11rb::COPY_FROM_PARENT,
&Default::default(),
)?;
- conn.set_selection_owner(win, atoms.WM_S0, x11rb::CURRENT_TIME)?;
+ LazyAtom::on_reply(&wm_s0, &conn, move |conn, atom| {
+ conn.set_selection_owner(win, atom, x11rb::CURRENT_TIME).unwrap();
+ });
// XWayland wants us to do this to function properly...?
- conn.composite_redirect_subwindows(screen.root, Redirect::Manual)?;
-
- conn.flush()?;
+ let root = screen.root;
+ conn.on_extension_information(composite::X11_EXTENSION_NAME, move |conn, res| {
+ assert_eq!(Ok(()), res, "Xwayland does not support the composite extension?!");
+ conn.composite_redirect_subwindows(root, Redirect::Manual).unwrap();
+ });
- let conn = Rc::new(conn);
let wm = Self {
- conn: Rc::clone(&conn),
- atoms,
unpaired_surfaces: Default::default(),
token,
window_map,
log,
+ wl_surface_id,
+ conn: None,
};
- Ok((wm, X11Source::new(conn)))
+ Ok((wm, conn))
}
- fn handle_event(&mut self, event: Event, client: &Client) -> Result<(), ReplyOrIdError> {
+ fn handle_event(&mut self, event: Event, client: &Client, conn: &X11Connection, rc: Rc<RefCell<Self>>) -> Result<(), ReplyOrIdError> {
debug!(self.log, "X11: Got event {:?}", event);
match event {
Event::ConfigureRequest(r) => {
@@ -179,35 +219,20 @@ impl X11State {
if r.value_mask & u16::from(ConfigWindow::BorderWidth) != 0 {
aux = aux.border_width(u32::try_from(r.border_width).unwrap());
}
- self.conn.configure_window(r.window, &aux)?;
+ conn.configure_window(r.window, &aux)?;
}
Event::MapRequest(r) => {
// Just grant the wish
- self.conn.map_window(r.window)?;
+ conn.map_window(r.window)?;
}
Event::ClientMessage(msg) => {
- if msg.type_ == self.atoms.WL_SURFACE_ID {
+ if msg.type_ == self.wl_surface_id.borrow().unwrap() {
// We get a WL_SURFACE_ID message when Xwayland creates a WlSurface for a
// window. Both the creation of the surface and this client message happen at
// roughly the same time and are sent over different sockets (X11 socket and
// wayland socket). Thus, we could receive these two in any order. Hence, it
// can happen that we get None below when X11 was faster than Wayland.
- let location = {
- match self.conn.get_geometry(msg.window)?.reply() {
- Ok(geo) => (geo.x.into(), geo.y.into()),
- Err(err) => {
- error!(
- self.log,
- "Failed to get geometry for {:x}, perhaps the window was already destroyed?",
- msg.window;
- "err" => format!("{:?}", err),
- );
- (0, 0)
- }
- }
- };
-
let id = msg.data.as_data32()[0];
let surface = client.get_resource::<WlSurface>(id);
info!(
@@ -216,9 +241,9 @@ impl X11State {
);
match surface {
None => {
- self.unpaired_surfaces.insert(id, (msg.window, location));
+ self.unpaired_surfaces.insert(id, msg.window);
}
- Some(surface) => self.new_window(msg.window, surface, location),
+ Some(surface) => self.new_window(msg.window, surface, Rc::clone(&rc)),
}
}
}
@@ -227,19 +252,40 @@ impl X11State {
Ok(())
}
- fn new_window(&mut self, window: Window, surface: WlSurface, location: (i32, i32)) {
+ fn new_window(&mut self, window: Window, surface: WlSurface, clone: Rc<RefCell<Self>>) {
debug!(self.log, "Matched X11 surface {:x?} to {:x?}", window, surface);
- if self.token.give_role_with(&surface, X11SurfaceRole).is_err() {
- // It makes no sense to post a protocol error here since that would only kill Xwayland
- error!(self.log, "Surface {:x?} already has a role?!", surface);
- return;
- }
+ let conn = self.conn.unwrap();
+ let conn = unsafe { &*conn };
+ conn.get_geometry(window)
+ .unwrap()
+ .on_reply(conn, move |_, reply| {
+ let wm = clone.borrow_mut();
+
+ let location = match reply {
+ Ok(geo) => (geo.x.into(), geo.y.into()),
+ Err(err) => {
+ error!(
+ wm.log,
+ "Failed to get geometry for {:x}, perhaps the window was already destroyed?",
+ window;
+ "err" => format!("{:?}", err),
+ );
+ (0, 0)
+ }
+ };
+
+ if wm.token.give_role_with(&surface, X11SurfaceRole).is_err() {
+ // It makes no sense to post a protocol error here since that would only kill Xwayland
+ error!(wm.log, "Surface {:x?} already has a role?!", surface);
+ return;
+ }
- let x11surface = X11Surface { surface };
- self.window_map
- .borrow_mut()
- .insert(Kind::X11(x11surface), location);
+ let x11surface = X11Surface { surface };
+ wm.window_map
+ .borrow_mut()
+ .insert(Kind::X11(x11surface), location);
+ });
}
}
@@ -248,11 +294,12 @@ pub fn commit_hook(surface: &WlSurface) {
// Is this the Xwayland client?
if let Some(client) = surface.as_ref().client() {
if let Some(x11) = client.data_map().get::<Rc<RefCell<X11State>>>() {
+ let clone = Rc::clone(&x11);
let mut inner = x11.borrow_mut();
// Is the surface among the unpaired surfaces (see comment next to WL_SURFACE_ID
// handling above)
- if let Some((window, location)) = inner.unpaired_surfaces.remove(&surface.as_ref().id()) {
- inner.new_window(window, surface.clone(), location);
+ if let Some(window) = inner.unpaired_surfaces.remove(&surface.as_ref().id()) {
+ inner.new_window(window, surface.clone(), clone);
}
}
}
diff --git a/anvil/src/xwayland/x11_connection.rs b/anvil/src/xwayland/x11_connection.rs
new file mode 100644
index 00000000..2a86a257
--- /dev/null
+++ b/anvil/src/xwayland/x11_connection.rs
@@ -0,0 +1,606 @@
+use std::{
+ cell::{Cell, RefCell},
+ collections::{hash_map::{Entry, HashMap}, VecDeque},
+ convert::{TryFrom, TryInto},
+ io::{Error as IOError, ErrorKind, IoSlice, Read, Result as IOResult, Write},
+ os::unix::net::UnixStream,
+ rc::Rc,
+};
+
+use smithay::reexports::calloop::{
+ generic::Generic,
+ EventSource, Interest, Mode, Poll, Readiness, Token,
+};
+
+use x11rb::{
+ connection::{DiscardMode, SequenceNumber, ReplyOrError, RequestConnection, RequestKind},
+ cookie::{Cookie, CookieWithFds, VoidCookie},
+ errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError},
+ protocol::{
+ Event,
+ xproto::{
+ ConnectionExt as _,
+ Setup,
+ SetupAuthenticate,
+ SetupFailed,
+ SetupRequest,
+ GE_GENERIC_EVENT,
+ KEYMAP_NOTIFY_EVENT,
+ },
+ },
+ utils::RawFdContainer,
+ x11_utils::{ExtensionInformation, ExtInfoProvider, Serialize, TryParse, X11Error},
+};
+
+pub type BufWithFds = x11rb::connection::BufWithFds<Vec<u8>>;
+
+pub trait CookieExt<R: for<'a> TryFrom<&'a [u8], Error = ParseError>> {
+ fn on_reply<C>(self, conn: &X11Connection, callback: C)
+ where
+ C: for<'b> FnOnce(&'b X11Connection, Result<R, ReplyError>) + 'static;
+}
+
+impl<R: for<'a> TryFrom<&'a [u8], Error = ParseError>, Conn: RequestConnection + ?Sized> CookieExt<R> for Cookie<'_, Conn, R> {
+ fn on_reply<C>(self, conn: &X11Connection, callback: C)
+ where
+ R: for<'a> TryFrom<&'a [u8], Error = ParseError>,
+ C: for<'b> FnOnce(&'b X11Connection, Result<R, ReplyError>) + 'static
+ {
+ let sequence = self.sequence_number();
+ std::mem::forget(self);
+ conn.on_reply(sequence, Box::new(|conn, reply| {
+ match reply {
+ Ok(reply) => {
+ match R::try_from(&reply[..]) {
+ Ok(reply) => callback(conn, Ok(reply)),
+ Err(err) => callback(conn, Err(ReplyError::ConnectionError(err.into())))
+ }
+ }
+ Err(err) => callback(conn, Err(err.into()))
+ }
+ }));
+ }
+}
+
+pub struct X11Connection {
+ stream: Generic<UnixStream>,
+ setup: Setup,
+ next_id: Cell<u32>,
+ sequence_write: Cell<SequenceNumber>,
+ sequence_need_sync: Cell<SequenceNumber>,
+ sequence_read: Cell<SequenceNumber>,
+ read_buffer: RefCell<Vec<u8>>,
+ write_buffer: RefCell<Vec<u8>>,
+ extension_manager: Rc<RefCell<ExtensionManager>>,
+ replies: RefCell<VecDeque<(SequenceNumber, Box<dyn for<'b> FnOnce(&'b X11Connection, Result<Vec<u8>, ReplyError>)>)>>,
+}
+
+impl X11Connection {
+ pub fn new(mut stream: UnixStream) -> Result<Self, ConnectError> {
+ // This does blocking I/O because the extra complexity for doing this non-blockingly is not
+ // worth it: Xwayland just started and we are the only X11 client. This should be quick.
+ stream.set_nonblocking(false)?;
+ let setup = setup(&mut stream)?;
+ stream.set_nonblocking(true)?;
+ let stream = Generic::new(stream, Interest::Both, Mode::Edge);
+ Ok(Self {
+ stream,
+ setup,
+ next_id: Cell::new(0),
+ sequence_write: Cell::new(0),
+ sequence_need_sync: Cell::new(u16::max_value().into()),
+ sequence_read: Cell::new(0),
+ read_buffer: RefCell::new(Vec::new()),
+ write_buffer: RefCell::new(Vec::new()),
+ extension_manager: Rc::new(RefCell::new(Default::default())),
+ replies: RefCell::new(Default::default()),
+ })
+ }
+
+ pub fn setup(&self) -> &Setup {
+ &self.setup
+ }
+
+ pub fn generate_id(&self) -> u32 {
+ let (id_base, id_mask) = (self.setup.resource_id_base, self.setup.resource_id_mask);
+ let id = self.next_id.get();
+
+ // Find the right-most set bit in the mask
+ let increment = id_mask & (1 + !id_mask);
+ self.next_id.set(self.next_id.get() + increment);
+
+ let result = id | id_base;
+ assert!(result <= id_base | id_mask, "Exhausted available XIDs and no one implemented XC-MISC yet");
+ result
+ }
+
+ fn on_reply(&self, sequence: SequenceNumber, callback: Box<dyn for<'b> FnOnce(&'b X11Connection, Result<Vec<u8>, ReplyError>)>) {
+ self.replies.borrow_mut().push_back((sequence, callback));
+ }
+
+ fn write_request(&self, request: &[IoSlice<'_>], has_reply: bool) -> Result<SequenceNumber, ConnectionError> {
+ if !has_reply && self.sequence_write == self.sequence_need_sync {
+ self.get_input_focus()?;
+ }
+
+ let sequence = {
+ let sequence_write = self.sequence_write.get();
+ self.sequence_write.set(sequence_write + 1);
+ sequence_write + 1
+ };
+
+ if has_reply {
+ self.sequence_need_sync.set(sequence + SequenceNumber::from(u16::max_value()));
+ }
+
+ {
+ let mut write_buffer = self.write_buffer.borrow_mut();
+ for part in request {
+ write_buffer.extend(&**part);
+ }
+ }
+ self.flush()?;
+
+ Ok(sequence)
+ }
+
+ // Try to flush the write buffer
+ fn flush(&self) -> IOResult<()> {
+ let mut write_buffer = self.write_buffer.borrow_mut();
+ if write_buffer.is_empty() {
+ // The stream is writable, but we have nothing to write. Since we are edge-triggered,
+ // calloop will stop informing us about writability.
+ return Ok(());
+ }
+ loop {
+ match (&self.stream.file).write(&write_buffer[..]) {
+ Ok(0) => return Err(IOError::new(ErrorKind::WriteZero, "failed to write the buffered data")),
+ Ok(n) => {
+ let _ = write_buffer.drain(..n);
+ if write_buffer.is_empty() {
+ return Ok(());
+ }
+ // Try again in another loop iteration
+ },
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => return Ok(()),
+ Err(e) => return Err(e),
+ }
+ }
+ }
+
+ // Read from the connection and then call the callback with the found packets
+ fn read<C>(&mut self, mut callback: C) -> Result<(), ReplyOrIdError>
+ where
+ C: FnMut(Vec<Result<Event, ParseError>>, &mut *const X11Connection) -> Result<(), ReplyOrIdError>
+ {
+ self.do_read().map_err(|err| ReplyOrIdError::ConnectionError(err.into()))?;
+ let packets = self.read_packets();
+ let mut events = Vec::new();
+ for packet in packets.into_iter() {
+ // Everything has a sequence number. With one exception. :-(
+ let sequence = if packet[0] == KEYMAP_NOTIFY_EVENT {
+ self.sequence_read.get()
+ } else {
+ // On the wire, the sequence number has 16 bits. That can wrap. Expand this to 64
+ // bits that cannot possible wrap.
+ let sequence = u16::from_ne_bytes([packet[2], packet[3]]);
+ let high_bytes = self.sequence_read.get() & !SequenceNumber::from(u16::max_value());
+ let mut full_number = SequenceNumber::from(sequence) | high_bytes;
+ if full_number < self.sequence_read.get() {
+ full_number += SequenceNumber::from(u16::max_value()) + 1;
+ }
+ full_number
+ };
+ self.sequence_read.set(sequence);
+
+ let next_sync = sequence + SequenceNumber::from(u16::max_value());
+ if self.sequence_need_sync.get() < next_sync {
+ self.sequence_need_sync.set(next_sync);
+ }
+
+ let mut replies = self.replies.borrow_mut();
+ let callback = if let Some((seqno, _)) = replies.front() {
+ if *seqno == sequence {
+ Some(replies.pop_front().unwrap().1)
+ } else {
+ assert!(*seqno > sequence);
+ None
+ }
+ } else {
+ None
+ };
+
+ if let Some(callback) = callback {
+ let reply = if packet[0] == 0 {
+ // This is an X11 error
+ match self.parse_error(&packet[..]) {
+ Ok(error) => Err(ReplyError::X11Error(error)),
+ Err(error) => Err(ReplyError::ConnectionError(error.into())),
+ }
+ } else {
+ // This must be a reply since we have a callback
+ assert_eq!(1, packet[0]);
+ Ok(packet)
+ };
+ callback(self, reply);
+ } else {
+ events.push(self.parse_event(&packet[..]))
+ }
+ }
+ callback(events, &mut (self as _))
+ }
+
+ // Read as many bytes as possible from the stream into the read_buffer
+ fn do_read(&mut self) -> IOResult<()> {
+ let mut read_buffer = self.read_buffer.borrow_mut();
+ let mut buffer = BufferSize::new(&mut read_buffer);
+
+ loop {
+ buffer.0.resize(buffer.1 + 4096, 0);
+ match self.stream.file.read(&mut buffer.0[buffer.1..]) {
+ Ok(0) => return Err(IOError::new(ErrorKind::UnexpectedEof, "Connection closed")),
+ Ok(n) => {
+ buffer.1 += n;
+ // Try again in another loop iteration
+ },
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => return Ok(()),
+ Err(e) => return Err(e),
+ }
+ };
+
+ // Reset the read buffer to the length that was actually read on error
+ struct BufferSize<'a>(&'a mut Vec<u8>, usize);
+
+ impl<'a> BufferSize<'a> {
+ fn new(vec: &'a mut Vec<u8>) -> Self {
+ let len = vec.len();
+ Self(vec, len)
+ }
+ }
+
+ impl Drop for BufferSize<'_> {
+ fn drop(&mut self) {
+ self.0.truncate(self.1);
+ }
+ }
+ }
+
+ // Read X11 packets from the connection.
+ fn read_packets(&mut self) -> Vec<Vec<u8>> {
+ const REPLY: u8 = 1;
+ let minimal_packet_length = 32;
+ let mut result = Vec::new();
+
+ let mut offset = 0;
+
+ let mut read_buffer = self.read_buffer.borrow_mut();
+ while read_buffer.len() - offset >= minimal_packet_length {
+ let packet = &read_buffer[offset..];
+
+ let response_type = packet[0];
+ let extra_length = if response_type == REPLY || response_type & 0x7f == GE_GENERIC_EVENT {
+ let length_field = packet[4..8].try_into().unwrap();
+ let length_field = u32::from_ne_bytes(length_field) as usize;
+ 4 * length_field
+ } else {
+ 0
+ };
+ let full_packet_length = minimal_packet_length + extra_length;
+
+ if read_buffer.len() - offset < full_packet_length {
+ break;
+ }
+
+ result.push(packet[..full_packet_length].to_vec());
+ offset += full_packet_length;
+ }
+ let _ = read_buffer.drain(..offset);
+
+ result
+ }
+
+ pub fn on_extension_information<C>(&self, extension_name: &'static str, callback: C)
+ where
+ C: for<'a> FnOnce(&'a X11Connection, Result<(), ()>) + 'static
+ {
+ let clone = Rc::clone(&self.extension_manager);
+ self.extension_manager.borrow_mut().on_extension_information(self, extension_name, clone, Box::new(callback))
+ }
+}
+
+impl RequestConnection for X11Connection {
+ type Buf = Vec<u8>;
+
+ fn send_request_with_reply<R>(
+ &self,
+ bufs: &[IoSlice<'_>],
+ fds: Vec<RawFdContainer>,
+ ) -> Result<Cookie<'_, Self, R>, ConnectionError>
+ where
+ R: for<'a> TryFrom<&'a [u8], Error = ParseError>
+ {
+ assert!(fds.is_empty());
+ let sequence = self.write_request(bufs, true)?;
+ Ok(Cookie::new(&self, sequence))
+ }
+
+ fn send_request_with_reply_with_fds<R>(
+ &self,
+ _bufs: &[IoSlice<'_>],
+ _fds: Vec<RawFdContainer>,
+ ) -> Result<CookieWithFds<'_, Self, R>, ConnectionError>
+ where
+ R: for<'a> TryFrom<(&'a [u8], Vec<RawFdContainer>), Error = ParseError>
+ {
+ unimplemented!()
+ }
+
+ fn send_request_without_reply(
+ &self,
+ bufs: &[IoSlice<'_>],
+ fds: Vec<RawFdContainer>,
+ ) -> Result<VoidCookie<'_, Self>, ConnectionError>
+ {
+ assert!(fds.is_empty());
+ let sequence = self.write_request(bufs, false)?;
+ Ok(VoidCookie::new(&self, sequence))
+ }
+
+ fn discard_reply(&self, sequence: SequenceNumber, kind: RequestKind, mode: DiscardMode) {
+ let _ = (sequence, kind, mode);
+ //todo!()
+ eprintln!("Ignoring discard_reply() call, this should be fixed eventually, but for now does not cause problems");
+ }
+
+ fn prefetch_extension_information(
+ &self,
+ extension_name: &'static str,
+ ) -> Result<(), ConnectionError> {
+ self.on_extension_information(extension_name, |_, _| {});
+ Ok(())
+ }
+
+ fn extension_information(
+ &self,
+ extension_name: &'static str,
+ ) -> Result<Option<ExtensionInformation>, ConnectionError> {
+ self.extension_manager.borrow_mut().extension_information(extension_name)
+ }
+
+ fn wait_for_reply_or_raw_error(
+ &self,
+ _sequence: SequenceNumber,
+ ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> {
+ unimplemented!()
+ }
+
+ fn wait_for_reply(
+ &self,
+ _sequence: SequenceNumber,
+ ) -> Result<Option<Vec<u8>>, ConnectionError> {
+ unimplemented!()
+ }
+
+ fn wait_for_reply_with_fds_raw(
+ &self,
+ _sequence: SequenceNumber,
+ ) -> Result<ReplyOrError<BufWithFds, Vec<u8>>, ConnectionError> {
+ unimplemented!()
+ }
+
+ fn check_for_raw_error(
+ &self,
+ _sequence: SequenceNumber,
+ ) -> Result<Option<Vec<u8>>, ConnectionError> {
+ unimplemented!()
+ }
+
+ fn prefetch_maximum_request_bytes(&self) {
+ unimplemented!()
+ }
+
+ fn maximum_request_bytes(&self) -> usize {
+ 4 * usize::from(self.setup.maximum_request_length)
+ }
+
+ fn parse_error(&self, error: &[u8]) -> Result<X11Error, ParseError> {
+ X11Error::try_parse(error, &*self.extension_manager.borrow_mut())
+ }
+
+ fn parse_event(&self, event: &[u8]) -> Result<Event, ParseError> {
+ Event::parse(event, &*self.extension_manager.borrow_mut())
+ }
+}
+
+impl EventSource for X11Connection {
+ type Event = Vec<Result<Event, ParseError>>;
+ type Metadata = *const X11Connection;
+ type Ret = Result<(), ReplyOrIdError>;
+
+ fn process_events<C>(&mut self, readiness: Readiness, _token: Token, callback: C) -> IOResult<()>
+ where
+ C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
+ {
+ if readiness.writable {
+ self.flush()?;
+ }
+ if readiness.readable {
+ self.read(callback).map_err(|err| match err {
+ ReplyOrIdError::ConnectionError(ConnectionError::IOError(err)) => err,
+ err => IOError::new(ErrorKind::Other, err)
+ })?;
+ }
+ Ok(())
+ }
+
+ fn register(&mut self, poll: &mut Poll, token: Token) -> IOResult<()> {
+ self.stream.register(poll, token)
+ }
+
+ fn reregister(&mut self, poll: &mut Poll, token: Token) -> IOResult<()> {
+ self.stream.reregister(poll, token)
+ }
+
+ fn unregister(&mut self, poll: &mut Poll) -> IOResult<()> {
+ self.stream.unregister(poll)
+ }
+}
+
+fn setup(stream: &mut UnixStream) -> Result<Setup, ConnectError> {
+ // Send a SetupRequest
+ let data = SetupRequest {
+ byte_order: byte_order(),
+ protocol_major_version: 11,
+ protocol_minor_version: 0,
+ authorization_protocol_name: Vec::new(),
+ authorization_protocol_data: Vec::new(),
+ }.serialize();
+ stream.write_all(&data)?;
+
+ // Get the Setup (known to be at least 8 bytes long)
+ let mut setup = vec![0; 8];
+ stream.read_exact(&mut setup[..])?;
+ // Now we know the length of the full Setup and can read it
+ let extra_length = usize::from(u16::from_ne_bytes([setup[6], setup[7]])) * 4;
+ setup.reserve_exact(extra_length);
+ setup.resize(8 + extra_length, 0);
+ stream.read_exact(&mut setup[8..])?;
+ match setup[0] {
+ // Success!
+ 1 => Ok(Setup::try_parse(&setup[..])?.0),
+ // The other cases are various errors
+ 0 => {
+ let failed = SetupFailed::try_parse(&setup[..])?.0;
+ Err(ConnectError::SetupFailed(failed))
+ }
+ 2 => {
+ let failed = SetupAuthenticate::try_parse(&setup[..])?.0;
+ Err(ConnectError::SetupAuthenticate(failed))
+ }
+ _ => { Err(ParseError::InvalidValue.into()) }
+ }
+}
+
+#[cfg(target_endian = "little")]
+fn byte_order() -> u8 {
+ 0x6c
+}
+
+#[cfg(target_endian = "big")]
+fn byte_order() -> u8 {
+ 0x42
+}
+
+#[derive(Default)]
+struct ExtensionManager(HashMap<&'static str, ExtState>);
+
+pub enum ExtState {
+ Requested(Vec<Box<dyn for<'a> FnOnce(&'a X11Connection, Result<(), ()>)>>),
+ Present(ExtensionInformation),
+ Missing,
+}
+
+impl ExtensionManager {
+ fn on_extension_information<C>(&mut self, conn: &X11Connection, extension_name: &'static str, self_clone: Rc<RefCell<ExtensionManager>>, callback: C)
+ where
+ C: for<'a> FnOnce(&X11Connection, Result<(), ()>) + 'static
+ {
+ match self.0.entry(extension_name) {
+ Entry::Occupied(entry) => match entry.into_mut() {
+ ExtState::Requested(cbs) => cbs.push(Box::new(callback)),
+ ExtState::Present(_) => callback(conn, Ok(())),
+ ExtState::Missing => callback(conn, Err(())),
+ },
+ Entry::Vacant(entry) => {
+ conn.query_extension(extension_name.as_bytes())
+ .unwrap()
+ .on_reply(conn, move |conn, reply| {
+ let info = reply.unwrap();
+ let (result, arg) = if info.present {
+ let info = ExtensionInformation {
+ major_opcode: info.major_opcode,
+ first_event: info.first_event,
+ first_error: info.first_error,
+ };
+ (ExtState::Present(info), Ok(()))
+ } else {
+ (ExtState::Missing, Err(()))
+ };
+ let old = self_clone.borrow_mut().0.insert(extension_name, result);
+ match old {
+ Some(ExtState::Requested(cbs)) => {
+ for callback in cbs.into_iter() {
+ callback(conn, arg);
+ }
+ }
+ _ => unreachable!(),
+ }
+ });
+ entry.insert(ExtState::Requested(vec![Box::new(callback)]));
+ }
+ }
+ }
+
+ fn extension_information(
+ &self,
+ extension_name: &'static str,
+ ) -> Result<Option<ExtensionInformation>, ConnectionError> {
+ match self.0.get(extension_name) {
+ Some(state) => match state {
+ ExtState::Requested(_) => panic!("Extension information not (yet?) available"),
+ ExtState::Present(info) => Ok(Some(*info)),
+ ExtState::Missing => Ok(None),
+ }
+ None => panic!("Extension information not (yet?) available")
+ }
+ }
+}
+
+impl ExtInfoProvider for ExtensionManager {
+ fn get_from_major_opcode(&self, major_opcode: u8) -> Option<(&str, ExtensionInformation)> {
+ self.0
+ .iter()
+ .filter_map(|(name, state)| {
+ if let ExtState::Present(info) = state {
+ Some((*name, *info))
+ } else {
+ None
+ }
+ })
+ .find(|(_, info)| info.major_opcode == major_opcode)
+ }
+
+ fn get_from_event_code(&self, event_code: u8) -> Option<(&str, ExtensionInformation)> {
+ self.0
+ .iter()
+ .filter_map(|(name, state)| {
+ if let ExtState::Present(info) = state {
+ if info.first_event <= event_code {
+ Some((*name, *info))
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .max_by_key(|(_, info)| info.first_event)
+ }
+
+ fn get_from_error_code(&self, error_code: u8) -> Option<(&str, ExtensionInformation)> {
+ self.0
+ .iter()
+ .filter_map(|(name, state)| {
+ if let ExtState::Present(info) = state {
+ if info.first_error <= error_code {
+ Some((*name, *info))
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .max_by_key(|(_, info)| info.first_error)
+ }
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment