Skip to content

Instantly share code, notes, and snippets.

@Ladas
Last active January 10, 2018 15:05
Show Gist options
  • Save Ladas/71da61f9ecbb3b99a090de7ab679ec52 to your computer and use it in GitHub Desktop.
Save Ladas/71da61f9ecbb3b99a090de7ab679ec52 to your computer and use it in GitHub Desktop.
Generates data in parallel, in 8 processe the speed is 86k records per second
ActiveRecord::Base.logger = Logger.new(STDOUT)
UNIQUE_INDEXES_FOR_MODELS = {
# Just having :ems_id & :ems_ref
ContainerBuild => [:ems_id, :ems_ref],
ContainerBuildPod => [:ems_id, :ems_ref],
ContainerGroup => [:ems_id, :ems_ref],
ContainerLimit => [:ems_id, :ems_ref],
ContainerNode => [:ems_id, :ems_ref],
ContainerProject => [:ems_id, :ems_ref],
ContainerQuota => [:ems_id, :ems_ref],
ContainerReplicator => [:ems_id, :ems_ref],
ContainerRoute => [:ems_id, :ems_ref],
ContainerService => [:ems_id, :ems_ref],
ContainerTemplate => [:ems_id, :ems_ref],
Container => [:ems_id, :ems_ref],
PersistentVolumeClaim => [:ems_id, :ems_ref],
# Having :ems_id but not ems_ref
ContainerImage => [:ems_id, :image_ref],
ContainerImageRegistry => [:ems_id, :host, :port],
# Nested tables, not having :ems_id and the foreign_key is a part of the unique index
ContainerCondition => [:container_entity_id, :container_entity_type, :name],
SecurityContext => [:resource_id, :resource_type],
ComputerSystem => [:managed_entity_id, :managed_entity_type],
ContainerEnvVar => [:container_id, :name, :value, :field_path],
ContainerLimitItem => [:container_limit_id, :resource, :item_type],
ContainerPortConfig => [:container_id, :ems_ref],
ContainerQuotaScope => [:container_quota_id, :scope],
ContainerQuotaItem => [:container_quota_id, :resource],
ContainerServicePortConfig => [:container_service_id, :name],
ContainerTemplateParameter => [:container_template_id, :name],
ContainerVolume => [:parent_id, :parent_type, :name],
CustomAttribute => [:resource_id, :resource_type, :name, :unique_name, :section, :source],
Hardware => [:vm_or_template_id, :host_id, :computer_system_id],
OperatingSystem => [:vm_or_template_id, :host_id, :computer_system_id],
}.freeze
NUMBER_OF_RECORDS = 1_200_000
NUMBER_OF_DUPLICATES = 3
BATCH_SIZE = 10_000
class MockSaver
include ManagerRefresh::SaveCollection::Saver::SqlHelper
def initialize(model_class, unique_index_columns)
@model_class = model_class
@table_name = model_class.table_name
@unique_index_columns = unique_index_columns
end
def build_data(model, foreign_key_value, string_value)
data = model_unique_keys(model).each_with_object({}) do |key, obj|
obj[key] = build_value(key, foreign_key_value, string_value)
end
assign_attributes_for_create!(data)
data
end
private
attr_reader :unique_index_columns, :table_name, :model_class
def time_now
@time_now ||= Time.now.utc
end
def primary_key
:id
end
def build_value(key, foreign_key_value, string_value)
if key.to_s.ends_with?("id")
foreign_key_value
else
string_value
end
end
def assign_attributes_for_update!(hash, update_time)
hash[:updated_on] = update_time if supports_updated_on?
hash[:updated_at] = update_time if supports_updated_at?
end
def assign_attributes_for_create!(hash, create_time = time_now)
hash[:type] = model_class.name if supports_sti? && hash[:type].nil?
hash[:created_on] = create_time if supports_created_on?
hash[:created_at] = create_time if supports_created_at?
assign_attributes_for_update!(hash, create_time)
end
def supports_sti?
@supports_sti_cache = model_class.column_names.include?("type") if @supports_sti_cache.nil?
@supports_sti_cache
end
def supports_created_on?
if @supports_created_on_cache.nil?
@supports_created_on_cache = (model_class.column_names.include?("created_on") && ActiveRecord::Base.record_timestamps)
end
@supports_created_on_cache
end
def supports_updated_on?
if @supports_updated_on_cache.nil?
@supports_updated_on_cache = (model_class.column_names.include?("updated_on") && ActiveRecord::Base.record_timestamps)
end
@supports_updated_on_cache
end
def supports_created_at?
if @supports_created_at_cache.nil?
@supports_created_at_cache = (model_class.column_names.include?("created_at") && ActiveRecord::Base.record_timestamps)
end
@supports_created_at_cache
end
def supports_updated_at?
if @supports_updated_at_cache.nil?
@supports_updated_at_cache = (model_class.column_names.include?("updated_at") && ActiveRecord::Base.record_timestamps)
end
@supports_updated_at_cache
end
def model_unique_keys(model)
models[model]
end
def models
UNIQUE_INDEXES_FOR_MODELS
end
def supports_remote_data_timestamp?(_)
false
end
def on_conflict_update
false
end
end
def process_in_parallel(jobs, &block)
require 'parallel'
# Parallel.map(jobs, :in_threads => 8, &block)
Parallel.map(jobs, :in_processes => 8, &block)
end
################################
# Generate the mock data
################################
start_time = Time.now.utc
process_in_parallel(UNIQUE_INDEXES_FOR_MODELS.to_a) do |model_class, keys|
ActiveRecord::Base.connection_pool.with_connection do
(1..NUMBER_OF_DUPLICATES).each do |_|
mock_saver = MockSaver.new(model_class, keys)
data = []
(1..NUMBER_OF_RECORDS).each do |i|
data << mock_saver.build_data(model_class, i, "string_#{i}")
if (i % BATCH_SIZE) == 0
data_for_saving = data
data = []
else
next
end
mock_saver.get_connection.execute(
mock_saver.build_insert_query(data_for_saving.first.keys, data_for_saving)
)
end
# Save remaining data
unless data.blank?
mock_saver.get_connection.execute(
mock_saver.build_insert_query(data.first.keys, data)
)
end
end
end
end
end_time = Time.now.utc
puts "Time taken #{end_time - start_time}s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment