Skip to content

Instantly share code, notes, and snippets.

@AvnerCohen
Created February 22, 2015 16:25
Show Gist options
  • Save AvnerCohen/72540e2dc13a56b4be87 to your computer and use it in GitHub Desktop.
Save AvnerCohen/72540e2dc13a56b4be87 to your computer and use it in GitHub Desktop.
event machine + http event machine + em websocket, memory leaking like mad !!
require 'bundler'
require 'socket'
Bundler.require
WS_CLIENTS = {}
WS_CLIENTS_NAME_TO_SOCKET = {}
WS_HOST = Socket.ip_address_list.last.ip_unpack.first)
WS_PORT = 8090
HTTP_PORT = WS_PORT + 1
# For monitoring, write pid
File.open("./tmp/em_websockets_server.pid", 'w') {|f| f.write(Process.pid) }
class HTTPHandler < EM::HttpServer::Server
def route_request(path)
response = EM::DelegatedHttpResponse.new(self)
case path
when '/ping'
simple_response("pong from ws_server, #{Time.now}", 200)
when /\/push/
data = CGI::parse(@http_query_string)
client_id = data["clientId"].first
if client_id.nil? || WS_CLIENTS[client_id].nil?
simple_response('{"results": "no_user"}', 200)
else
PublishMessage.do(WS_CLIENTS[client_id], "messageSent:#{client_id}:message_sent")
simple_response("{\"results\": \"Published to #{client_id}\"}", 200)
end
else
simple_response('{"results": "fail_on_else"}', 401)
end
end
def process_http_request
if (@http_request_uri != '')
route_request(@http_request_uri)
else
bad_simple_response
end
end
def simple_response(text = "{results: 'notok'}", code = 500)
response = EM::DelegatedHttpResponse.new(self)
response.content_type 'text/plain'
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Credentials'] = false
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS, PUT, PATCH, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'X-Requested-With,content-type'
response.status = code
response.content = text
response.send_response
end
def http_request_errback(e)
simple_response(e.inspect, 500)
end
end
module PublishMessage
module_function
def do(ws, msg)
ws.send(msg) unless ws.nil?
end
end
Signal.trap("INT") { EventMachine.stop }
Signal.trap("TERM") { EventMachine.stop }
Signal.trap("QUIT") { EventMachine.stop }
EventMachine.epoll
EventMachine.set_descriptor_table_size( 20000 )
EventMachine::run do
puts "Starting websockets server at socket://#{WS_HOST}:#{WS_PORT}"
EventMachine::WebSocket.start(:host => WS_HOST, :port => "#{WS_PORT}") do |ws|
ws.onopen do |handshake|
##place holder
end
ws.onmessage do |msg|
event_type, *data = msg.split(":")
if event_type == 'typing'
PublishMessage.do(WS_CLIENTS[data.first], "typing:#{data.last}")
elsif event_type == 'connected'
WS_CLIENTS_NAME_TO_SOCKET[ws.get_pid] = data.first
WS_CLIENTS[data.first] = ws
end
end
ws.onclose do
socket_entry = WS_CLIENTS_NAME_TO_SOCKET[ws.get_pid]
unless socket_entry.nil?
WS_CLIENTS_NAME_TO_SOCKET.delete(ws.get_pid)
WS_CLIENTS.delete(socket_entry)
end
end
end
puts "Starting http server at http://#{WS_HOST}:#{HTTP_PORT}"
EventMachine::start_server(WS_HOST, "#{HTTP_PORT}", HTTPHandler)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment