Files
redux-scraper/app/lib/fork_future.rb
2025-01-01 03:29:53 +00:00

96 lines
2.0 KiB
Ruby

# typed: true
class ForkFuture
def initialize(&block)
read, write = ::IO.pipe
@read = read
pid =
::Process.fork do
start = Time.now
read.close
begin
result = block.call
rescue StandardError
result = RuntimeError.new([$!.message, $!.backtrace])
end
duration = Time.now - start
::Marshal.dump({ duration: duration, result: result }, write)
::Process.exit!(true)
end
write.close
end
def self.parallel_map(num_processes, enumerator, &block)
ForkFuture
.each_slice_impl(num_processes, enumerator)
.map { |slice| ForkFuture.new { slice.map(&block) } }
.to_a
.map(&:join)
.flatten(1)
end
def self.parallel_each(num_processes, enumerator, &block)
ForkFuture
.each_slice_impl(num_processes, enumerator)
.map do |slice|
ForkFuture.new do
slice.each(&block)
nil
end
end
.to_a
.map(&:join)
end
def self.parallel_each_slice(num_processes, enumerator, &block)
ForkFuture
.each_slice_impl(num_processes, enumerator)
.map do |slice|
ForkFuture.new do
block.call(slice)
nil
end
end
.to_a
.map(&:join)
end
def self.parallel_map_slice(num_processes, enumerator, &block)
ForkFuture
.each_slice_impl(num_processes, enumerator)
.map { |slice| ForkFuture.new { block.call(slice) } }
.to_a
.map(&:join)
end
def join
wait!
r = @result[:result]
raise r if r.is_a? RuntimeError
r
end
def duration
wait!
@result[:duration]
end
private
def self.each_slice_impl(num_processes, enumerator)
size = enumerator.size
slice_size = (size.to_f / num_processes).ceil
slice_size = [1, slice_size].max
raise if (slice_size * num_processes) < size
enumerator.each_slice(slice_size)
end
def wait!
@result ||=
begin
result_buffer = @read.read
@read.close
::Marshal.load(result_buffer)
end
end
end