Class: GraphQL::Dataloader
- Inherits:
-
Object
- Object
- GraphQL::Dataloader
- Defined in:
- lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb,
lib/graphql/dataloader/async_dataloader.rb
Overview
This plugin supports Fiber-based concurrency, along with Source.
Direct Known Subclasses
Defined Under Namespace
Classes: AsyncDataloader, NullDataloader, Request, RequestAll, Source
Constant Summary collapse
- NonblockingDataloader =
Class.new(self) { self.default_nonblocking = true }
Class Attribute Summary collapse
-
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
Class Method Summary collapse
-
.use(schema, nonblocking: nil) ⇒ Object
-
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
Instance Method Summary collapse
-
#append_job(&job) ⇒ Object
-
#clear_cache ⇒ void
Clear any already-loaded objects from Source caches.
-
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
-
#initialize(nonblocking: self.class.default_nonblocking) ⇒ Dataloader
constructor
A new instance of Dataloader.
-
#nonblocking? ⇒ Boolean
-
#run ⇒ Object
-
#run_fiber(f) ⇒ Object
-
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
-
#set_fiber_variables(vars) ⇒ void
Set up the fiber variables in a new fiber.
-
#spawn_fiber ⇒ Object
-
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn’t doing well with the implementation below.
-
#yield ⇒ void
Tell the dataloader that this fiber is waiting for data.
Constructor Details
#initialize(nonblocking: self.class.default_nonblocking) ⇒ Dataloader
Returns a new instance of Dataloader.
53 54 55 56 57 58 59 |
# File 'lib/graphql/dataloader.rb', line 53 def initialize(nonblocking: self.class.default_nonblocking) @source_cache = Hash.new { |h, k| h[k] = {} } @pending_jobs = [] if !nonblocking.nil? @nonblocking = nonblocking end end |
Class Attribute Details
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
27 28 29 |
# File 'lib/graphql/dataloader.rb', line 27 def default_nonblocking @default_nonblocking end |
Class Method Details
.use(schema, nonblocking: nil) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/graphql/dataloader.rb', line 32 def self.use(schema, nonblocking: nil) schema.dataloader_class = if nonblocking warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.") NonblockingDataloader else self end end |
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
43 44 45 46 47 48 49 50 51 |
# File 'lib/graphql/dataloader.rb', line 43 def self.with_dataloading(&block) dataloader = self.new result = nil dataloader.append_job { result = block.call(dataloader) } dataloader.run result end |
Instance Method Details
#append_job(&job) ⇒ Object
132 133 134 135 136 137 |
# File 'lib/graphql/dataloader.rb', line 132 def append_job(&job) # Given a block, queue it up to be worked through when `#run` is called. # (If the dataloader is already running, than a Fiber will pick this up later.) @pending_jobs.push(job) nil end |
#clear_cache ⇒ void
This method returns an undefined value.
Clear any already-loaded objects from Source caches
141 142 143 144 145 146 |
# File 'lib/graphql/dataloader.rb', line 141 def clear_cache @source_cache.each do |_source_class, batched_sources| batched_sources.each_value(&:clear_cache) end nil end |
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/graphql/dataloader.rb', line 69 def get_fiber_variables fiber_vars = {} Thread.current.keys.each do |fiber_var_key| # This variable should be fresh in each new fiber if fiber_var_key != :__graphql_runtime_info fiber_vars[fiber_var_key] = Thread.current[fiber_var_key] end end fiber_vars end |
#nonblocking? ⇒ Boolean
61 62 63 |
# File 'lib/graphql/dataloader.rb', line 61 def nonblocking? @nonblocking end |
#run ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/graphql/dataloader.rb', line 176 def run job_fibers = [] next_job_fibers = [] source_fibers = [] next_source_fibers = [] first_pass = true manager = spawn_fiber do while first_pass || job_fibers.any? first_pass = false while (f = job_fibers.shift || spawn_job_fiber) if f.alive? finished = run_fiber(f) if !finished next_job_fibers << f end end end join_queues(job_fibers, next_job_fibers) while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } while (f = source_fibers.shift || spawn_source_fiber) if f.alive? finished = run_fiber(f) if !finished next_source_fibers << f end end end join_queues(source_fibers, next_source_fibers) end end end run_fiber(manager) rescue UncaughtThrowError => e throw e.tag, e.value end |
#run_fiber(f) ⇒ Object
217 218 219 220 221 222 223 |
# File 'lib/graphql/dataloader.rb', line 217 def run_fiber(f) if use_fiber_resume? f.resume else f.transfer end end |
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/graphql/dataloader.rb', line 149 def run_isolated prev_queue = @pending_jobs prev_pending_keys = {} @source_cache.each do |source_class, batched_sources| batched_sources.each do |batch_args, batched_source_instance| if batched_source_instance.pending? prev_pending_keys[batched_source_instance] = batched_source_instance.pending.dup batched_source_instance.pending.clear end end end @pending_jobs = [] res = nil # Make sure the block is inside a Fiber, so it can `Fiber.yield` append_job { res = yield } run res ensure @pending_jobs = prev_queue prev_pending_keys.each do |source_instance, pending| source_instance.pending.merge!(pending) end end |
#set_fiber_variables(vars) ⇒ void
This method returns an undefined value.
Set up the fiber variables in a new fiber.
This is called within the fiber, right after it is spawned.
86 87 88 89 |
# File 'lib/graphql/dataloader.rb', line 86 def set_fiber_variables(vars) vars.each { |k, v| Thread.current[k] = v } nil end |
#spawn_fiber ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/graphql/dataloader.rb', line 225 def spawn_fiber fiber_vars = get_fiber_variables parent_fiber = use_fiber_resume? ? nil : Fiber.current Fiber.new(blocking: !@nonblocking) { set_fiber_variables(fiber_vars) Thread.current[:parent_fiber] = parent_fiber yield # With `.transfer`, you have to explicitly pass back to the parent -- # if the fiber is allowed to terminate normally, control is passed to the main fiber instead. if parent_fiber parent_fiber.transfer(true) else true end } end |
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn’t doing well with the implementation below
98 99 100 101 102 103 104 105 |
# File 'lib/graphql/dataloader.rb', line 98 def with(source_class, *batch_args) batch_key = source_class.batch_key_for(*batch_args) @source_cache[source_class][batch_key] ||= begin source = source_class.new(*batch_args) source.setup(self) source end end |
#yield ⇒ void
This method returns an undefined value.
Tell the dataloader that this fiber is waiting for data.
Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
121 122 123 124 125 126 127 128 129 |
# File 'lib/graphql/dataloader.rb', line 121 def yield if use_fiber_resume? Fiber.yield else parent_fiber = Thread.current[:parent_fiber] parent_fiber.transfer end nil end |