Last active
July 30, 2018 15:09
-
-
Save Ladas/438f830635d7f59873c28fbee7cc4e38 to your computer and use it in GitHub Desktop.
Collector/Persistor split
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# require 'manageiq_performance' | |
def generate_batches_od_data(ems_name:, total_elements:, batch_size: 1000) | |
ems = ExtManagementSystem.find_by(:name => ems_name) | |
persister = ManageIQ::Providers::Amazon::Inventory::Persister::TargetCollection.new( | |
ems, ems | |
) | |
count = 1 | |
persister, count = process_entity(ems, :vm, persister, count, total_elements, batch_size) | |
persister, count = process_entity(ems, :image, persister, count, total_elements, batch_size) | |
persister, count = process_entity(ems, :availability_zone, persister, count, total_elements, batch_size) | |
persister, count = process_entity(ems, :flavor, persister, count, total_elements, batch_size) | |
persister, count = process_entity(ems, :orchestration_stack, persister, count, total_elements, batch_size) | |
# persister, count = process_entity(ems, :key_pair, persister, count, total_elements, batch_size) | |
# Send or update the rest which is batch smaller than the batch size | |
# send_or_update(ems, :key_pair, persister, :rest, batch_size) | |
send_or_update(ems, :orchestration_stack, persister, :rest, batch_size) | |
end | |
def process_entity(ems, entity_name, starting_persister, starting_count, total_elements, batch_size) | |
persister = starting_persister | |
count = starting_count | |
(1..total_elements).each do |index| | |
send("parse_#{entity_name.to_s}", index, persister) | |
persister, count = send_or_update(ems, entity_name, persister, count, batch_size) | |
end | |
return persister, count | |
end | |
def send_or_update(ems, entity_name, persister, count, batch_size) | |
if count == :rest || count >= batch_size | |
############################ Replace by sending to kafka and use the saving code on the other side START ######### | |
# persister = ManagerRefresh::Inventory::Persister.from_yaml(persister.to_yaml) | |
puts "#{Time.now.utc} enqueueing :#{entity_name}, :count => #{count}" | |
RefreshQueue.enqueue!(persister) | |
puts "#{Time.now.utc} enqueued" | |
# And and create new persistor so the old one with data can be GCed | |
return_persister = ManageIQ::Providers::Amazon::Inventory::Persister::TargetCollection.new( | |
ems, ems | |
) | |
return_count = 1 | |
else | |
return_persister = persister | |
addition = case entity_name | |
when :vm | |
2 | |
else | |
1 | |
end | |
return_count = count + addition | |
end | |
return return_persister, return_count | |
end | |
def parse_orchestration_stack(index, persister) | |
parent = index > 2 ? persister.orchestration_stacks.lazy_find("stack_#{index - 1}") : nil | |
persister.orchestration_stacks.build( | |
:ems_ref => "stack_#{index}", | |
:name => "stack_#{index}_name", | |
:description => "stack_#{index}_description", | |
:status => "stack_#{index}_ok", | |
:status_reason => "stack_#{index}_status_reason", | |
:parent => parent | |
) | |
end | |
def parse_vm(index, persister) | |
lazy_vm = persister.vms.lazy_find("instance_#{index}") | |
persister.vms.build( | |
:ems_ref => "instance_#{index}", | |
:uid_ems => "instance_#{index}_uid_ems", | |
:name => "instance_#{index}_name", | |
:vendor => "amazon", | |
:raw_power_state => "instance_#{index} status", | |
:boot_time => Time.now ,#nil, # this will cause that dta are updated in second + refresh | |
:availability_zone => persister.availability_zones.lazy_find("az_#{index}"), | |
:flavor => persister.flavors.lazy_find("flavor_#{index}"), | |
:genealogy_parent => persister.miq_templates.lazy_find("image_#{index}"), | |
# :key_pairs => [persister.key_pairs.lazy_find("key_pair_#{index}")], | |
:location => persister.networks.lazy_find({ | |
:hardware => persister.hardwares.lazy_find(:vm_or_template => lazy_vm), | |
:description => "public" | |
}, | |
{ | |
:key => :hostname, | |
:default => 'unknown' | |
}), | |
:orchestration_stack => persister.orchestration_stacks.lazy_find("stack_#{index}") | |
) | |
parse_hardware(index, persister) | |
end | |
def parse_image(index, persister) | |
persister.miq_templates.build( | |
:ems_ref => "image_#{index}", | |
:uid_ems => "image_#{index}_uid_ems", | |
:name => "image_#{index}_name", | |
:location => "image_#{index}_location", | |
:vendor => "amazon", | |
:raw_power_state => "never", | |
:template => true, | |
:publicly_available => true | |
) | |
end | |
def parse_flavor(index, persister) | |
persister.flavors.build( | |
:ems_ref => "flavor_#{index}", | |
:name => "flavor_#{index}_name", | |
:description => "flavor_#{index}_description", | |
:enabled => true, | |
:cpus => 1, | |
:cpu_cores => 1, | |
:memory => 1024, | |
:supports_32_bit => true, | |
:supports_64_bit => true, | |
:supports_hvm => true, | |
:supports_paravirtual => false, | |
:block_storage_based_only => true, | |
:cloud_subnet_required => true, | |
:ephemeral_disk_size => 10, | |
:ephemeral_disk_count => 1 | |
) | |
end | |
def parse_availability_zone(index, persister) | |
persister.availability_zones.build( | |
:ems_ref => "az_#{index}", | |
:name => "az_#{index}_name" | |
) | |
end | |
def parse_hardware(index, persister) | |
persister.hardwares.build( | |
:vm_or_template => persister.vms.lazy_find("instance_#{index}"), | |
:bitness => 64, | |
:virtualization_type => "hvm", | |
:root_device_type => "root_device_type", | |
:cpu_sockets => 4, | |
:cpu_cores_per_socket => 1, | |
:cpu_total_cores => 6, | |
:memory_mb => 600, | |
:disk_capacity => 200, | |
:manufacturer => "hardware_#{index}", | |
:guest_os => persister.hardwares.lazy_find(persister.miq_templates.lazy_find("image_#{index}"), :key => :guest_os), | |
) | |
parse_disks(index, persister) | |
end | |
def parse_disks(index, persister) | |
persister.disks.build( | |
:hardware => persister.hardwares.lazy_find(persister.vms.lazy_find("instance_#{index}")), | |
:device_name => 'sda', | |
:device_type => "disk", | |
:controller_type => "amazon", | |
:location => "dev/sda1_#{index}_1", | |
) | |
persister.disks.build( | |
:hardware => persister.hardwares.lazy_find(persister.vms.lazy_find("instance_#{index}")), | |
:device_name => 'sda1', | |
:device_type => "disk", | |
:controller_type => "amazon", | |
:location => "dev/sda2_#{index}_2", | |
) | |
end | |
def parse_key_pair(index, persister) | |
persister.key_pairs.build( | |
:name => "key_pair_#{index}", | |
:fingerprint => "key_pair_#{index}_fingerprint" | |
) | |
end | |
# ActiveRecord::Base.logger = Logger.new(STDOUT) | |
manager_name = ARGV[0] || "aws_ems_small" | |
total_elements = ARGV[1].try(:to_i) || 100000 | |
batch_size = ARGV[2].try(:to_i) || 1000 | |
# ManageIQPerformance.profile do | |
_, timings = Benchmark.realtime_block(:ems_total_refresh) do | |
generate_batches_od_data( | |
:ems_name => manager_name, | |
:total_elements => total_elements, | |
:batch_size => batch_size | |
) | |
end | |
message = "aws_ems LADAS_TOTAL_BENCH 1st refresh #{timings.inspect}, total_elements: #{total_elements*4}" | |
$log.info message | |
puts message | |
puts "time: ems_refresh=#{timings[:ems_refresh] / 60}" | |
puts "time: other=#{(timings[:ems_total_refresh] - timings[:ems_refresh]) / 60 }" | |
# end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
manager_name = ARGV[0] || "aws_ems_small" | |
host_name = ARGV[1] || "192.168.231.231" | |
ActiveRecord::Base.logger = Logger.new(STDOUT) | |
ems = ManageIQ::Providers::Amazon::CloudManager.create!(:name => manager_name, | |
:hostname => host_name, | |
:port => 443, | |
:zone => Zone.first, | |
:provider_region => "us-east-1") | |
# ems.update_authentication(:bearer => {:auth_key => token, :save => true}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
while 0 | |
persister = RefreshQueue.dequeue! | |
if persister.blank? | |
puts "waiting....." | |
sleep(20) | |
next | |
end | |
puts "#{Time.now.utc} START persisting!" | |
persister.persist! | |
puts "#{Time.now.utc} FINISHED persisting!" | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment