17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/graphql/dataloader/async_dataloader.rb', line 17
def run(trace_query_lazy: nil)
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
next_job_fibers = []
source_tasks = []
next_source_tasks = []
first_pass = true
sources_condition = Async::Condition.new
manager = spawn_fiber do
trace&.begin_dataloader(self)
while first_pass || !job_fibers.empty?
first_pass = false
fiber_vars = get_fiber_variables
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
Sync do |root_task|
set_fiber_variables(fiber_vars)
while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace))))
if task.alive?
root_task.yield next_source_tasks << task
end
end
sources_condition.signal
source_tasks.concat(next_source_tasks)
next_source_tasks.clear
end
end
if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
end
end
end
trace&.end_dataloader(self)
end
manager.resume
if manager.alive?
raise "Invariant: Manager didn't terminate successfully: #{manager}"
end
rescue UncaughtThrowError => e
throw e.tag, e.value
end
|