Last active
February 3, 2022 00:25
-
-
Save fonsp/b5a27c49a53bf8bee5766c763350bfa0 to your computer and use it in GitHub Desktop.
Simple TCP server and client in Julia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Sockets | |
import Sockets: connect | |
using Logging | |
ENV["JULIA_DEBUG"] = Main | |
Base.@kwdef struct 🐸ServerConnection | |
stream::IO | |
read_task::Task | |
end | |
# API: `send_message` and `create_server` | |
function send_message(sc::🐸ServerConnection, message::Union{Vector{UInt8,String}}) | |
write(sc.stream, msg) | |
end | |
function create_connection(; | |
port::Integer, | |
on_message::Function | |
) | |
stream = connect(port) | |
@info "Client: Connected!" | |
read_task = @async try | |
while isopen(stream) && isreadable(stream) | |
incoming = readavailable(stream) | |
if !isempty(incoming) | |
on_message(incoming) | |
end | |
end | |
@info "Client: stopped reading" | |
catch e | |
@error "Client: read error" exception = (e, catch_backtrace()) | |
end | |
return 🐸ServerConnection(; | |
stream = stream, | |
read_task | |
) | |
end | |
"Wrap `expr` in `try ... catch`. The exception is logged and then ignored." | |
macro trylog(expr, logmsg, loglevel = Logging.Warn) | |
quote | |
try | |
$(esc(expr)) | |
catch ex | |
@logmsg $(loglevel) $(logmsg) exception = (ex, catch_backtrace()) | |
end | |
end | |
end | |
"Shut down a server" | |
function Base.close(server::🐸ServerConnection) | |
@trylog( | |
isopen(server.stream) && close(server.stream), | |
"Failed to close connection" | |
) | |
@trylog( | |
wait(server.read_task), | |
"Something went wrong with the read task" | |
) | |
end | |
Base.wait(sc::🐸ServerConnection) = wait(sc.read_task) | |
Base.isopen(sc::🐸ServerConnection) = isopen(sc.stream) | |
### | |
# Example usage | |
conn = create_connection(; | |
port = 9090, | |
on_message = data -> begin | |
@info "Client: Received: " String(data) | |
end | |
) | |
sleep(1) | |
send_message(conn, "Hi!!") | |
sleep(1) | |
send_message(conn, "fonsi") | |
sleep(1) | |
send_message(conn, "Hannesssss") | |
sleep(1) | |
close(conn) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Sockets | |
import Sockets: connect, listen | |
ENV["JULIA_DEBUG"] = Main | |
using Logging | |
"Like @async except it prints errors to the terminal. 👶" | |
macro asynclog(expr) | |
quote | |
@async begin | |
# because this is being run asynchronously, we need to catch exceptions manually | |
try | |
$(esc(expr)) | |
catch ex | |
bt = stacktrace(catch_backtrace()) | |
showerror(stderr, ex, bt) | |
rethrow(ex) | |
end | |
end | |
end | |
end | |
# Two structs | |
Base.@kwdef struct 🐸Server | |
port::UInt16 | |
accept_task::Task | |
tcp_server::Sockets.TCPServer | |
end | |
Base.@kwdef struct 🐸ClientConnection | |
stream::IO | |
outbox::Channel{Any} | |
read_task::Task | |
write_task::Task | |
end | |
const SHUTDOWN = Ref(nothing) | |
# API: `send_message` and `create_server` | |
function send_message(client::🐸ClientConnection, message::Union{Vector{UInt8,String}}) | |
put!(client.outbox, message) | |
end | |
function create_server(; | |
port::Integer, | |
on_message::Function = (client, message) -> nothing, | |
on_disconnect::Function = (client) -> nothing | |
) | |
# Create a TCP server | |
port = UInt16(port) | |
tcp_server = listen(port) | |
accept_task = @asynclog begin | |
while isopen(tcp_server) | |
# Wait for a new client to connect, accept the connection and store the stream. | |
client_stream = try | |
accept(tcp_server) | |
catch ex | |
if isopen(tcp_server) | |
@warn "Failed to open client stream" exception = (ex, catch_backtrace()) | |
end | |
nothing | |
end | |
if client_stream isa IO | |
client_id = String(rand('a':'z', 6)) | |
@info "Server: connected" client_id | |
# Will hold the client later... | |
client_ref = Ref{Union{Nothing,🐸ClientConnection}}(nothing) | |
# This task takes items from the outbox and sends them to the client. | |
write_task = Task() do | |
while isopen(client_stream) | |
next_msg = take!(client_ref[].outbox) | |
@debug "Message to write!" next_msg | |
if next_msg !== SHUTDOWN && isopen(client_stream) | |
try | |
@debug "writing..." | |
write(client_stream, next_msg) | |
catch ex | |
if isopen(client_stream) | |
@warn "Server: failed to write to client" client_id exception = (ex, catch_backtrace()) | |
end | |
end | |
else | |
break | |
end | |
end | |
@info "Server: client outbox task finished" client_id | |
end | |
# This task reads from the client and sends the messages to `on_message`. | |
read_task = Task() do | |
while isopen(client_stream) | |
incoming = try | |
# Read any available data. This call blocks until data arrives. | |
readavailable(client_stream) | |
catch e | |
@warn "Server: failed to read client data" client_id exception = (e, catch_backtrace()) | |
nothing | |
end | |
if !isnothing(incoming) && !isempty(incoming) | |
@debug "Server: message from" client_id length(incoming) | |
try | |
# Let the user handle the message. | |
on_message(client_ref[], incoming) | |
catch e | |
@error "Server: failed to call on_message handler" client_id exception = (e, catch_backtrace()) | |
end | |
end | |
end | |
# At this point, the client stream has closed. We send a signal... | |
on_disconnect(client) | |
# ...and we stop the write_task: | |
begin | |
# Clear the queue... | |
while isready(client_ref[].outbox) | |
take!(client_ref[].outbox) | |
end | |
# ...send the stop signal... | |
put!(client_ref[].outbox, SHUTDOWN) | |
# ...wait for the write task to finish. | |
wait(write_task) | |
end | |
@info "Server: stopped reading client" client_id | |
end | |
client = 🐸ClientConnection(; | |
stream = client_stream, | |
outbox = Channel{Any}(256), | |
read_task, | |
write_task | |
) | |
client_ref[] = client | |
schedule(read_task) | |
schedule(write_task) | |
end | |
end | |
@info "Server: stopped accepting" | |
end | |
return 🐸Server(; | |
port, | |
accept_task, | |
tcp_server | |
) | |
end | |
"Wrap `expr` in `try ... catch`. The exception is logged and then ignored." | |
macro trylog(expr, logmsg, loglevel = Logging.Warn) | |
quote | |
try | |
$(esc(expr)) | |
catch ex | |
@logmsg $(loglevel) $(logmsg) exception = (ex, catch_backtrace()) | |
end | |
end | |
end | |
"Shut down a server" | |
function Base.close(server::🐸Server) | |
@trylog( | |
isopen(server.tcp_server) && close(server.tcp_server), | |
"Failed to close server" | |
) | |
@trylog( | |
wait(server.accept_task), | |
"Something went wrong with the accept task" | |
) | |
end | |
Base.wait(server::🐸Server) = wait(server.accept_task) | |
Base.wait(client::🐸ClientConnection) = begin | |
wait(client.read_task) | |
wait(client.write_task) | |
end | |
Base.isopen(server::🐸Server) = isopen(server.tcp_server) | |
Base.isopen(client::🐸ClientConnection) = isopen(client.stream) | |
### | |
# Example usage | |
server = create_server(; | |
port = 9090, | |
on_message = (client, message) -> begin | |
to_send = "Hello, " * String(message) | |
@info "to_send" to_send | |
send_message(client, to_send) | |
end, | |
on_disconnect = (client) -> | |
@info "Server: client disconnected" objectid(client) | |
) | |
sleep(30) | |
close(server) | |
# You probably dont want to sleep-and-close. Instead, use `wait` to block until the server is closed: | |
# wait(server) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment