Last active
January 10, 2018 15:05
-
-
Save Ladas/71da61f9ecbb3b99a090de7ab679ec52 to your computer and use it in GitHub Desktop.
Generates data in parallel, in 8 processe the speed is 86k records per second
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ActiveRecord::Base.logger = Logger.new(STDOUT) | |
UNIQUE_INDEXES_FOR_MODELS = { | |
# Just having :ems_id & :ems_ref | |
ContainerBuild => [:ems_id, :ems_ref], | |
ContainerBuildPod => [:ems_id, :ems_ref], | |
ContainerGroup => [:ems_id, :ems_ref], | |
ContainerLimit => [:ems_id, :ems_ref], | |
ContainerNode => [:ems_id, :ems_ref], | |
ContainerProject => [:ems_id, :ems_ref], | |
ContainerQuota => [:ems_id, :ems_ref], | |
ContainerReplicator => [:ems_id, :ems_ref], | |
ContainerRoute => [:ems_id, :ems_ref], | |
ContainerService => [:ems_id, :ems_ref], | |
ContainerTemplate => [:ems_id, :ems_ref], | |
Container => [:ems_id, :ems_ref], | |
PersistentVolumeClaim => [:ems_id, :ems_ref], | |
# Having :ems_id but not ems_ref | |
ContainerImage => [:ems_id, :image_ref], | |
ContainerImageRegistry => [:ems_id, :host, :port], | |
# Nested tables, not having :ems_id and the foreign_key is a part of the unique index | |
ContainerCondition => [:container_entity_id, :container_entity_type, :name], | |
SecurityContext => [:resource_id, :resource_type], | |
ComputerSystem => [:managed_entity_id, :managed_entity_type], | |
ContainerEnvVar => [:container_id, :name, :value, :field_path], | |
ContainerLimitItem => [:container_limit_id, :resource, :item_type], | |
ContainerPortConfig => [:container_id, :ems_ref], | |
ContainerQuotaScope => [:container_quota_id, :scope], | |
ContainerQuotaItem => [:container_quota_id, :resource], | |
ContainerServicePortConfig => [:container_service_id, :name], | |
ContainerTemplateParameter => [:container_template_id, :name], | |
ContainerVolume => [:parent_id, :parent_type, :name], | |
CustomAttribute => [:resource_id, :resource_type, :name, :unique_name, :section, :source], | |
Hardware => [:vm_or_template_id, :host_id, :computer_system_id], | |
OperatingSystem => [:vm_or_template_id, :host_id, :computer_system_id], | |
}.freeze | |
NUMBER_OF_RECORDS = 1_200_000 | |
NUMBER_OF_DUPLICATES = 3 | |
BATCH_SIZE = 10_000 | |
class MockSaver | |
include ManagerRefresh::SaveCollection::Saver::SqlHelper | |
def initialize(model_class, unique_index_columns) | |
@model_class = model_class | |
@table_name = model_class.table_name | |
@unique_index_columns = unique_index_columns | |
end | |
def build_data(model, foreign_key_value, string_value) | |
data = model_unique_keys(model).each_with_object({}) do |key, obj| | |
obj[key] = build_value(key, foreign_key_value, string_value) | |
end | |
assign_attributes_for_create!(data) | |
data | |
end | |
private | |
attr_reader :unique_index_columns, :table_name, :model_class | |
def time_now | |
@time_now ||= Time.now.utc | |
end | |
def primary_key | |
:id | |
end | |
def build_value(key, foreign_key_value, string_value) | |
if key.to_s.ends_with?("id") | |
foreign_key_value | |
else | |
string_value | |
end | |
end | |
def assign_attributes_for_update!(hash, update_time) | |
hash[:updated_on] = update_time if supports_updated_on? | |
hash[:updated_at] = update_time if supports_updated_at? | |
end | |
def assign_attributes_for_create!(hash, create_time = time_now) | |
hash[:type] = model_class.name if supports_sti? && hash[:type].nil? | |
hash[:created_on] = create_time if supports_created_on? | |
hash[:created_at] = create_time if supports_created_at? | |
assign_attributes_for_update!(hash, create_time) | |
end | |
def supports_sti? | |
@supports_sti_cache = model_class.column_names.include?("type") if @supports_sti_cache.nil? | |
@supports_sti_cache | |
end | |
def supports_created_on? | |
if @supports_created_on_cache.nil? | |
@supports_created_on_cache = (model_class.column_names.include?("created_on") && ActiveRecord::Base.record_timestamps) | |
end | |
@supports_created_on_cache | |
end | |
def supports_updated_on? | |
if @supports_updated_on_cache.nil? | |
@supports_updated_on_cache = (model_class.column_names.include?("updated_on") && ActiveRecord::Base.record_timestamps) | |
end | |
@supports_updated_on_cache | |
end | |
def supports_created_at? | |
if @supports_created_at_cache.nil? | |
@supports_created_at_cache = (model_class.column_names.include?("created_at") && ActiveRecord::Base.record_timestamps) | |
end | |
@supports_created_at_cache | |
end | |
def supports_updated_at? | |
if @supports_updated_at_cache.nil? | |
@supports_updated_at_cache = (model_class.column_names.include?("updated_at") && ActiveRecord::Base.record_timestamps) | |
end | |
@supports_updated_at_cache | |
end | |
def model_unique_keys(model) | |
models[model] | |
end | |
def models | |
UNIQUE_INDEXES_FOR_MODELS | |
end | |
def supports_remote_data_timestamp?(_) | |
false | |
end | |
def on_conflict_update | |
false | |
end | |
end | |
def process_in_parallel(jobs, &block) | |
require 'parallel' | |
# Parallel.map(jobs, :in_threads => 8, &block) | |
Parallel.map(jobs, :in_processes => 8, &block) | |
end | |
################################ | |
# Generate the mock data | |
################################ | |
start_time = Time.now.utc | |
process_in_parallel(UNIQUE_INDEXES_FOR_MODELS.to_a) do |model_class, keys| | |
ActiveRecord::Base.connection_pool.with_connection do | |
(1..NUMBER_OF_DUPLICATES).each do |_| | |
mock_saver = MockSaver.new(model_class, keys) | |
data = [] | |
(1..NUMBER_OF_RECORDS).each do |i| | |
data << mock_saver.build_data(model_class, i, "string_#{i}") | |
if (i % BATCH_SIZE) == 0 | |
data_for_saving = data | |
data = [] | |
else | |
next | |
end | |
mock_saver.get_connection.execute( | |
mock_saver.build_insert_query(data_for_saving.first.keys, data_for_saving) | |
) | |
end | |
# Save remaining data | |
unless data.blank? | |
mock_saver.get_connection.execute( | |
mock_saver.build_insert_query(data.first.keys, data) | |
) | |
end | |
end | |
end | |
end | |
end_time = Time.now.utc | |
puts "Time taken #{end_time - start_time}s" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment