Skip to content

Instantly share code, notes, and snippets.

@jsvd
Created April 20, 2021 10:06
Show Gist options
  • Save jsvd/7233c9496518bb185a81e507d6275049 to your computer and use it in GitHub Desktop.
Save jsvd/7233c9496518bb185a81e507d6275049 to your computer and use it in GitHub Desktop.
# Usage:
#
# ruby send_broken.rb <host> <port> <CA_file> <Cert_file> <Key_file>
# ruby send_broken.rb 127.0.0.1 5044 /Users/joaoduarte/certs/RootCA.crt /Users/joaoduarte/certs/Client-Root.crt /Users/joaoduarte/certs/Client-Root.key
#
# encoding: utf-8
require "socket"
require "zlib"
require "json"
require "openssl"
module Lumberjack
SEQUENCE_MAX = (2**32-1).freeze
class Socket
attr_reader :sequence
attr_reader :socket
attr_reader :host
attr_reader :last_ack
def initialize(host, port)
@sequence = 0
@last_ack = 0
connection_start(host, port)
end
private
def connection_start(host, port)
tcp_socket = TCPSocket.new(host, port)
@socket = OpenSSL::SSL::SSLSocket.new tcp_socket, get_context
@socket.connect
@socket = socket
end
private
def get_context
context = OpenSSL::SSL::SSLContext.new
context.ca_file = ARGV[2]
context.cert = OpenSSL::X509::Certificate.new(File.open(ARGV[3]))
context.key = OpenSSL::PKey::RSA.new(File.open(ARGV[4]))
context.verify_mode = OpenSSL::SSL::VERIFY_PEER
context
end
private
def inc
@sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX
@sequence = @sequence + 1
end
private
def send_window_size(size)
@socket.syswrite(["2", "W", size].pack("AAN"))
end
private
def send_payload(payload)
bytes_written = 0
while bytes_written < payload.bytesize
#sleep 7
bytes_written += @socket.syswrite(payload.byteslice(bytes_written..-1))
end
end
public
def write_sync_payload(payload, size)
send_window_size(size)
compress = compress_payload(payload)
send_payload(compress)
ack(size)
end
public
def write_sync(elements, opts={})
elements = [elements] if elements.is_a?(Hash)
send_window_size(elements.size)
payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join
compress = compress_payload(payload)
send_payload(compress)
ack(elements.size)
end
private
def compress_payload(payload)
compress = Zlib::Deflate.deflate(payload)
["2", "C", compress.bytesize, compress].pack("AANA*")
end
public
def ack(size)
_, type = read_version_and_type
raise "Whoa we shouldn't get this frame: #{type}" if type != "A"
@last_ack = read_last_ack
end
private
def unacked_sequence_size
sequence - (@last_ack + 1)
end
private
def read_version_and_type
version = @socket.read(1)
type = @socket.read(1)
[version, type]
end
private
def read_last_ack
@socket.read(4).unpack("N").first
end
end
module JsonEncoder
def self.to_frame(hash, sequence)
json = ::JSON.dump(hash)
json_length = json.bytesize
pack = "AANNA#{json_length}"
frame = ["2", "J", sequence, json_length, json]
frame.pack(pack)
end
end # JsonEncoder
end
puts "sending data to #{ARGV[0]}:#{ARGV[1]}"
batch = 10.times.map { |i| {"id" => i, "message" => "hey"*50 } }
c = Lumberjack::Socket.new(ARGV[0], ARGV[1].to_i)
puts c.write_sync(batch)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment