Skip to content

Instantly share code, notes, and snippets.

@Ladas
Last active January 17, 2018 15:56
Show Gist options
  • Save Ladas/10d4f5c75502dbc3a8c3caeb114fc501 to your computer and use it in GitHub Desktop.
Save Ladas/10d4f5c75502dbc3a8c3caeb114fc501 to your computer and use it in GitHub Desktop.
generate_metrics_parallel.rb
# ActiveRecord::Base.logger = Logger.new(STDOUT)
NUMBER_OF_CONTAINERS = 20_000 # 100_000
NUMBER_OF_PROJECTS = 100
LABELS_PER_CONTAINER_IMAGE = 20
METRIC_PERIOD = 20.seconds
METRICS_START = 4.hours.ago.utc
METRICS_END = Time.now.utc
# Have also archived containers?
NUMBER_OF_ARCHIVED_CONTAINERS = 20_000 # 100_000
ARCHIVED_METRICS_START = nil #2.days.ago # Set nil to disable creating metrics for archived containers
ARCHIVED_METRICS_END = METRICS_START - 1.hour
# Do only X containers per time, otherwise memory would go up a lot
# E.g. 1.days of metrics is 4320 metrics records per 1 container. So doing 100 containers at a time 432000 metrics
MEMORY_SAVING_CONTAINERS_BATCH = 500
NUMBER_OF_PARALLEL_PROCESSES = 8
manager_name = ARGV[0] || "10.16.31.72"
ems = ExtManagementSystem.find_by(:name => manager_name)
unless ems
token = 'eysJhbGciOiJSUzI1NiIsInR5cCI6IkpXVC.W_OMh-bUQfSqt0s2YoaR7kfANGHWlhRCQpjGqiecoBmEGRHuoc4TN1GFqGZH-YyEx-qIjDCXSBdhJVEXFfZ8bf_Mb_cXA3B4I9pHpXIZMvqvOR_BIdpcQHJSk3jjTt7JK3vxfU_MarefHpISuITEqHc9SNp2UrVpL4_XwmiN_jBazZ5OvtME1KJrhX9ZmvTgZws8weCUIrWHT-IyIfzSqtqjUkDKJ_nU_qY0y03xUz6t3ydmaohQbdaWf5TEIdjw6rJeNIHI43_vQRT_X1fuRHMp_IMBlVx8r-BoWlEoseKn6xnpAnE2vPAOyNaNtzu__wl-k3_Qumh8t059hrFwdA'.freeze
ems = ManageIQ::Providers::Openshift::ContainerManager.create(
:name => manager_name,
:hostname => manager_name,
:port => 443,
:zone => Zone.first)
ems.update_authentication(:bearer => {:auth_key => token, :save => true})
end
def archived?(i)
i <= NUMBER_OF_ARCHIVED_CONTAINERS
end
class TestPersister < ManageIQ::Providers::Openshift::Inventory::Persister::ContainerManager
def shared_options
super.merge(
:complete => false, # To avoid deletion, since we insert in batches
:create_only => true, # Otherwise we would select a lot of data for update
:strategy => :local_db_find_missing_references
)
end
end
def create_persister(ems)
persister = TestPersister.new(ems)
metrics_collection = ::ManagerRefresh::InventoryCollection.new(
:manager_ref => [:resource, :timestamp, :capture_interval_name],
:name => :metrics,
:saver_strategy => :batch,
:arel => Metric.none,
:complete => false,
:model_class => Metric,
)
persister.collections[:metrics] = metrics_collection
persister
end
start_time = Time.now.utc
first_persister = create_persister(ems)
(1..NUMBER_OF_PROJECTS).each do |i|
first_persister.container_projects.build(
:ems_ref => "container_project_ems_ref_#{i}",
:name => "container_project_name_#{i}",
)
end
first_persister.persist!
def number_of_processes
NUMBER_OF_PARALLEL_PROCESSES
end
def process_in_parallel(jobs, &block)
require 'parallel'
Parallel.map(jobs, :in_processes => number_of_processes, &block)
end
jobs = (1..NUMBER_OF_ARCHIVED_CONTAINERS + NUMBER_OF_CONTAINERS).each_slice((NUMBER_OF_ARCHIVED_CONTAINERS + NUMBER_OF_CONTAINERS) / number_of_processes).to_a
process_in_parallel(jobs) do |indexes|
ActiveRecord::Base.connection_pool.with_connection do
persister = create_persister(ems)
batch_start_time = Time.now.utc
indexes.each do |i|
container_group = persister.container_groups.build(
:ems_ref => "container_ems_ref_#{i}",
:name => "container_name_#{i}",
:container_project => persister.container_projects.lazy_find("container_project_ems_ref_#{(i % NUMBER_OF_PROJECTS) + 1}"),
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil
)
container_image = persister.container_images.build(
:image_ref => "container_image_ref_#{i}",
:name => "container_name_#{i}",
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil
)
container = persister.containers.build(
:ems_ref => "container_ems_ref_#{i}",
:name => "container_name_#{i}",
:container_group => container_group,
:container_image => container_image,
:started_at => archived?(i) ? ARCHIVED_METRICS_START : METRICS_START,
:finished_at => archived?(i) ? ARCHIVED_METRICS_END : METRICS_END,
:deleted_on => archived?(i) ? ARCHIVED_METRICS_END : nil
)
(1..LABELS_PER_CONTAINER_IMAGE).each do |label_index|
persister.collections[[:custom_attributes_for, "ContainerGroup", "labels"]].build(
# :resource => container_group, # Should we have some labels for Container Groups
:resource => container_image,
:section => "labels",
:name => "label_name_#{label_index}"
)
end
step_iterator = if archived?(i)
(ARCHIVED_METRICS_START.beginning_of_minute..ARCHIVED_METRICS_END).step_value(METRIC_PERIOD) unless ARCHIVED_METRICS_START.nil?
else
(METRICS_START.beginning_of_minute..METRICS_END).step_value(METRIC_PERIOD)
end
# Beginning of the minute, so we start on aligned 00 seconds
unless step_iterator.nil?
step_iterator.each do |timestamp|
next unless timestamp
persister.metrics.build(
:resource => container,
:timestamp => timestamp.utc.iso8601.to_s,
:capture_interval_name => 'realtime',
:derived_vm_numvcpus => 4,
:cpu_usage_rate_average => 70,
)
end
end
# Batch the persisting
if MEMORY_SAVING_CONTAINERS_BATCH > 0 && (i % MEMORY_SAVING_CONTAINERS_BATCH) == 0
persister.persist!
inserted_records_count = persister.collections.values.sum { |x| x.created_records.count }
updated_records_count = persister.collections.values.sum { |x| x.updated_records.count }
persister = create_persister(ems)
batch_end_time = Time.now.utc
batch_time = batch_end_time - batch_start_time
puts "------------------------------"
puts "Time taken building the batch: #{batch_time}s, inserted total #{inserted_records_count} and updated total #{updated_records_count} records"
container_left = indexes.last - i
batches_left = container_left / MEMORY_SAVING_CONTAINERS_BATCH
puts "Container left processing #{container_left}, total records left #{(inserted_records_count + updated_records_count) * batches_left}, estimated time left ==== #{(batches_left * batch_time) / (60.0)} ==== minutes"
puts "------------------------------"
batch_start_time = batch_end_time
end
end
# Save the rest
persister.persist!
end
end
end_time = Time.now.utc
puts "Time taken total: #{end_time - start_time}s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment