Last active
January 17, 2018 15:56
-
-
Save Ladas/10d4f5c75502dbc3a8c3caeb114fc501 to your computer and use it in GitHub Desktop.
generate_metrics_parallel.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ActiveRecord::Base.logger = Logger.new(STDOUT) | |
NUMBER_OF_CONTAINERS = 20_000 # 100_000 | |
NUMBER_OF_PROJECTS = 100 | |
LABELS_PER_CONTAINER_IMAGE = 20 | |
METRIC_PERIOD = 20.seconds | |
METRICS_START = 4.hours.ago.utc | |
METRICS_END = Time.now.utc | |
# Have also archived containers? | |
NUMBER_OF_ARCHIVED_CONTAINERS = 20_000 # 100_000 | |
ARCHIVED_METRICS_START = nil #2.days.ago # Set nil to disable creating metrics for archived containers | |
ARCHIVED_METRICS_END = METRICS_START - 1.hour | |
# Do only X containers per time, otherwise memory would go up a lot | |
# E.g. 1.days of metrics is 4320 metrics records per 1 container. So doing 100 containers at a time 432000 metrics | |
MEMORY_SAVING_CONTAINERS_BATCH = 500 | |
NUMBER_OF_PARALLEL_PROCESSES = 8 | |
manager_name = ARGV[0] || "10.16.31.72" | |
ems = ExtManagementSystem.find_by(:name => manager_name) | |
unless ems | |
token = 'eysJhbGciOiJSUzI1NiIsInR5cCI6IkpXVC.W_OMh-bUQfSqt0s2YoaR7kfANGHWlhRCQpjGqiecoBmEGRHuoc4TN1GFqGZH-YyEx-qIjDCXSBdhJVEXFfZ8bf_Mb_cXA3B4I9pHpXIZMvqvOR_BIdpcQHJSk3jjTt7JK3vxfU_MarefHpISuITEqHc9SNp2UrVpL4_XwmiN_jBazZ5OvtME1KJrhX9ZmvTgZws8weCUIrWHT-IyIfzSqtqjUkDKJ_nU_qY0y03xUz6t3ydmaohQbdaWf5TEIdjw6rJeNIHI43_vQRT_X1fuRHMp_IMBlVx8r-BoWlEoseKn6xnpAnE2vPAOyNaNtzu__wl-k3_Qumh8t059hrFwdA'.freeze | |
ems = ManageIQ::Providers::Openshift::ContainerManager.create( | |
:name => manager_name, | |
:hostname => manager_name, | |
:port => 443, | |
:zone => Zone.first) | |
ems.update_authentication(:bearer => {:auth_key => token, :save => true}) | |
end | |
def archived?(i) | |
i <= NUMBER_OF_ARCHIVED_CONTAINERS | |
end | |
class TestPersister < ManageIQ::Providers::Openshift::Inventory::Persister::ContainerManager | |
def shared_options | |
super.merge( | |
:complete => false, # To avoid deletion, since we insert in batches | |
:create_only => true, # Otherwise we would select a lot of data for update | |
:strategy => :local_db_find_missing_references | |
) | |
end | |
end | |
def create_persister(ems) | |
persister = TestPersister.new(ems) | |
metrics_collection = ::ManagerRefresh::InventoryCollection.new( | |
:manager_ref => [:resource, :timestamp, :capture_interval_name], | |
:name => :metrics, | |
:saver_strategy => :batch, | |
:arel => Metric.none, | |
:complete => false, | |
:model_class => Metric, | |
) | |
persister.collections[:metrics] = metrics_collection | |
persister | |
end | |
start_time = Time.now.utc | |
first_persister = create_persister(ems) | |
(1..NUMBER_OF_PROJECTS).each do |i| | |
first_persister.container_projects.build( | |
:ems_ref => "container_project_ems_ref_#{i}", | |
:name => "container_project_name_#{i}", | |
) | |
end | |
first_persister.persist! | |
def number_of_processes | |
NUMBER_OF_PARALLEL_PROCESSES | |
end | |
def process_in_parallel(jobs, &block) | |
require 'parallel' | |
Parallel.map(jobs, :in_processes => number_of_processes, &block) | |
end | |
jobs = (1..NUMBER_OF_ARCHIVED_CONTAINERS + NUMBER_OF_CONTAINERS).each_slice((NUMBER_OF_ARCHIVED_CONTAINERS + NUMBER_OF_CONTAINERS) / number_of_processes).to_a | |
process_in_parallel(jobs) do |indexes| | |
ActiveRecord::Base.connection_pool.with_connection do | |
persister = create_persister(ems) | |
batch_start_time = Time.now.utc | |
indexes.each do |i| | |
container_group = persister.container_groups.build( | |
:ems_ref => "container_ems_ref_#{i}", | |
:name => "container_name_#{i}", | |
:container_project => persister.container_projects.lazy_find("container_project_ems_ref_#{(i % NUMBER_OF_PROJECTS) + 1}"), | |
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil | |
) | |
container_image = persister.container_images.build( | |
:image_ref => "container_image_ref_#{i}", | |
:name => "container_name_#{i}", | |
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil | |
) | |
container = persister.containers.build( | |
:ems_ref => "container_ems_ref_#{i}", | |
:name => "container_name_#{i}", | |
:container_group => container_group, | |
:container_image => container_image, | |
:started_at => archived?(i) ? ARCHIVED_METRICS_START : METRICS_START, | |
:finished_at => archived?(i) ? ARCHIVED_METRICS_END : METRICS_END, | |
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil | |
) | |
(1..LABELS_PER_CONTAINER_IMAGE).each do |label_index| | |
persister.collections[[:custom_attributes_for, "ContainerGroup", "labels"]].build( | |
# :resource => container_group, # Should we have some labels for Container Groups | |
:resource => container_image, | |
:section => "labels", | |
:name => "label_name_#{label_index}" | |
) | |
end | |
step_iterator = if archived?(i) | |
(ARCHIVED_METRICS_START.beginning_of_minute..ARCHIVED_METRICS_END).step_value(METRIC_PERIOD) unless ARCHIVED_METRICS_START.nil? | |
else | |
(METRICS_START.beginning_of_minute..METRICS_END).step_value(METRIC_PERIOD) | |
end | |
# Beginning of the minute, so we start on aligned 00 seconds | |
unless step_iterator.nil? | |
step_iterator.each do |timestamp| | |
next unless timestamp | |
persister.metrics.build( | |
:resource => container, | |
:timestamp => timestamp.utc.iso8601.to_s, | |
:capture_interval_name => 'realtime', | |
:derived_vm_numvcpus => 4, | |
:cpu_usage_rate_average => 70, | |
) | |
end | |
end | |
# Batch the persisting | |
if MEMORY_SAVING_CONTAINERS_BATCH > 0 && (i % MEMORY_SAVING_CONTAINERS_BATCH) == 0 | |
persister.persist! | |
inserted_records_count = persister.collections.values.sum { |x| x.created_records.count } | |
updated_records_count = persister.collections.values.sum { |x| x.updated_records.count } | |
persister = create_persister(ems) | |
batch_end_time = Time.now.utc | |
batch_time = batch_end_time - batch_start_time | |
puts "------------------------------" | |
puts "Time taken building the batch: #{batch_time}s, inserted total #{inserted_records_count} and updated total #{updated_records_count} records" | |
container_left = indexes.last - i | |
batches_left = container_left / MEMORY_SAVING_CONTAINERS_BATCH | |
puts "Container left processing #{container_left}, total records left #{(inserted_records_count + updated_records_count) * batches_left}, estimated time left ==== #{(batches_left * batch_time) / (60.0)} ==== minutes" | |
puts "------------------------------" | |
batch_start_time = batch_end_time | |
end | |
end | |
# Save the rest | |
persister.persist! | |
end | |
end | |
end_time = Time.now.utc | |
puts "Time taken total: #{end_time - start_time}s" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment