Class: GraphQL::Dataloader::AsyncDataloader::Run

Inherits:
Object
  • Object
show all
Defined in:
lib/graphql/dataloader/async_dataloader.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(root_task, trace, total_fiber_limit, jobs_fiber_limit) ⇒ Run

Returns a new instance of Run.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/graphql/dataloader/async_dataloader.rb', line 20

def initialize(root_task, trace, total_fiber_limit, jobs_fiber_limit)
  @root_task = root_task
  @trace = trace
  @total_fiber_limit = total_fiber_limit
  @jobs_fiber_limit = jobs_fiber_limit

  @finished_tasks = nil
  @started_tasks = nil
  @started_count_task = nil
  @finished_count_task = nil
  @finished_all_tasks = nil
  @finished_first_pass = nil

  @snoozed_jobs_condition = Async::Condition.new
  @snoozed_sources_condition = Async::Condition.new
end

Instance Attribute Details

#finished_tasksObject (readonly)

Returns the value of attribute finished_tasks.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def finished_tasks
  @finished_tasks
end

#jobs_fiber_limitObject (readonly)

Returns the value of attribute jobs_fiber_limit.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def jobs_fiber_limit
  @jobs_fiber_limit
end

#root_taskObject (readonly)

Returns the value of attribute root_task.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def root_task
  @root_task
end

#snoozed_jobs_conditionObject (readonly)

Returns the value of attribute snoozed_jobs_condition.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def snoozed_jobs_condition
  @snoozed_jobs_condition
end

#snoozed_sources_conditionObject (readonly)

Returns the value of attribute snoozed_sources_condition.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def snoozed_sources_condition
  @snoozed_sources_condition
end

#started_tasksObject (readonly)

Returns the value of attribute started_tasks.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def started_tasks
  @started_tasks
end

#total_fiber_limitObject (readonly)

Returns the value of attribute total_fiber_limit.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def total_fiber_limit
  @total_fiber_limit
end

#traceObject (readonly)

Returns the value of attribute trace.



37
38
39
# File 'lib/graphql/dataloader/async_dataloader.rb', line 37

def trace
  @trace
end

Instance Method Details

#allowed_sources_tasksObject



44
45
46
47
48
49
50
51
# File 'lib/graphql/dataloader/async_dataloader.rb', line 44

def allowed_sources_tasks
  within_limit = total_fiber_limit - running_count
  if within_limit < 1
    1
  else
    within_limit
  end
end

#close_queuesObject



53
54
55
56
57
58
59
# File 'lib/graphql/dataloader/async_dataloader.rb', line 53

def close_queues
  @finished_tasks.close
  @finished_count_task.cancel

  @started_tasks.close
  @started_count_task.cancel
end

#jobs_bandwidth?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/graphql/dataloader/async_dataloader.rb', line 40

def jobs_bandwidth?
  running_count < jobs_fiber_limit
end

#new_queuesObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/graphql/dataloader/async_dataloader.rb', line 78

def new_queues
  @finished_tasks = Async::Queue.new
  @finished_count = 0
  @started_tasks = Async::Queue.new
  @started_count = 0
  @finished_first_pass = Async::Promise.new
  @finished_all_tasks = Async::Promise.new

  @started_count_task = @root_task.async do
    @finished_first_pass.wait
    while _t = @started_tasks.wait
      @started_count += 1
      if @finished_count == @started_count
        @finished_all_tasks.resolve(true)
      end
    end
  end

  @finished_count_task = @root_task.async do
    while t_or_err = @finished_tasks.wait
      if t_or_err.is_a?(StandardError)
        @finished_all_tasks.reject(t_or_err)
      else
        @finished_count +=1
        if @finished_count == @started_count
          @finished_all_tasks.resolve(true)
        end
      end
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/graphql/dataloader/async_dataloader.rb', line 110

def running?
  @snoozed_jobs_condition.waiting? || @snoozed_sources_condition.waiting?
end

#running_countObject



61
62
63
64
65
66
67
# File 'lib/graphql/dataloader/async_dataloader.rb', line 61

def running_count
  @snoozed_jobs_condition.instance_variable_get(:@ready).num_waiting +
    @snoozed_sources_condition.instance_variable_get(:@ready).num_waiting +
    @started_count +
    @started_tasks.size -
    @finished_count
end

#wait_for_queuesObject



69
70
71
72
73
74
75
76
# File 'lib/graphql/dataloader/async_dataloader.rb', line 69

def wait_for_queues
  if !@finished_first_pass.resolved?
    @finished_first_pass.resolve(true)
  end

  @finished_all_tasks.wait
  @finished_all_tasks = Async::Promise.new
end