Class: GraphQL::Dataloader
- Inherits:
-
Object
- Object
- GraphQL::Dataloader
- Defined in:
- lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb,
lib/graphql/dataloader/async_dataloader.rb,
lib/graphql/dataloader/active_record_source.rb,
lib/graphql/dataloader/active_record_association_source.rb
Overview
This plugin supports Fiber-based concurrency, along with Source.
Direct Known Subclasses
Defined Under Namespace
Classes: ActiveRecordAssociationSource, ActiveRecordSource, AsyncDataloader, NullDataloader, Request, RequestAll, Source
Class Attribute Summary collapse
-
.default_fiber_limit ⇒ Object
Returns the value of attribute default_fiber_limit.
-
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
Instance Attribute Summary collapse
-
#fiber_limit ⇒ Integer?
readonly
Class Method Summary collapse
-
.use(schema, nonblocking: nil, fiber_limit: nil) ⇒ Object
-
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
Instance Method Summary collapse
-
#append_job(callable = nil, &job) ⇒ Object
-
#cleanup_fiber ⇒ Object
This method is called when Dataloader is finished using a fiber.
-
#clear_cache ⇒ void
Clear any already-loaded objects from Source caches.
-
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
-
#initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit) ⇒ Dataloader
constructor
A new instance of Dataloader.
-
#lazy_at_depth(depth, lazy) ⇒ Object
private
-
#merge_records(records, index_by: :id) ⇒ void
Pre-warm the Dataloader cache with ActiveRecord objects which were loaded elsewhere.
-
#nonblocking? ⇒ Boolean
-
#run(trace_query_lazy: nil) ⇒ Object
-
#run_fiber(f) ⇒ Object
-
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
-
#set_fiber_variables(vars) ⇒ void
Set up the fiber variables in a new fiber.
-
#spawn_fiber ⇒ Object
-
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn’t doing well with the implementation below.
-
#yield(source = ) ⇒ void
Tell the dataloader that this fiber is waiting for data.
Constructor Details
#initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit) ⇒ Dataloader
Returns a new instance of Dataloader.
60 61 62 63 64 65 66 67 68 |
# File 'lib/graphql/dataloader.rb', line 60 def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit) @source_cache = Hash.new { |h, k| h[k] = {} } @pending_jobs = [] if !nonblocking.nil? @nonblocking = nonblocking end @fiber_limit = fiber_limit @lazies_at_depth = Hash.new { |h, k| h[k] = [] } end |
Class Attribute Details
.default_fiber_limit ⇒ Object
Returns the value of attribute default_fiber_limit.
29 30 31 |
# File 'lib/graphql/dataloader.rb', line 29 def default_fiber_limit @default_fiber_limit end |
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
29 30 31 |
# File 'lib/graphql/dataloader.rb', line 29 def default_nonblocking @default_nonblocking end |
Instance Attribute Details
#fiber_limit ⇒ Integer? (readonly)
71 72 73 |
# File 'lib/graphql/dataloader.rb', line 71 def fiber_limit @fiber_limit end |
Class Method Details
.use(schema, nonblocking: nil, fiber_limit: nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/graphql/dataloader.rb', line 32 def self.use(schema, nonblocking: nil, fiber_limit: nil) dataloader_class = if nonblocking warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.") Class.new(self) { self.default_nonblocking = true } else self end if fiber_limit dataloader_class = Class.new(dataloader_class) dataloader_class.default_fiber_limit = fiber_limit end schema.dataloader_class = dataloader_class end |
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
50 51 52 53 54 55 56 57 58 |
# File 'lib/graphql/dataloader.rb', line 50 def self.with_dataloading(&block) dataloader = self.new result = nil dataloader.append_job { result = block.call(dataloader) } dataloader.run result end |
Instance Method Details
#append_job(callable = nil, &job) ⇒ Object
144 145 146 147 148 149 |
# File 'lib/graphql/dataloader.rb', line 144 def append_job(callable = nil, &job) # Given a block, queue it up to be worked through when `#run` is called. # (If the dataloader is already running, then a Fiber will pick this up later.) @pending_jobs.push(callable || job) nil end |
#cleanup_fiber ⇒ Object
This method is called when Dataloader is finished using a fiber. Use it to perform any cleanup, such as releasing database connections (if required manually)
102 103 |
# File 'lib/graphql/dataloader.rb', line 102 def cleanup_fiber end |
#clear_cache ⇒ void
This method returns an undefined value.
Clear any already-loaded objects from Source caches
153 154 155 156 157 158 |
# File 'lib/graphql/dataloader.rb', line 153 def clear_cache @source_cache.each do |_source_class, batched_sources| batched_sources.each_value(&:clear_cache) end nil end |
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
81 82 83 84 85 86 87 |
# File 'lib/graphql/dataloader.rb', line 81 def get_fiber_variables fiber_vars = {} Thread.current.keys.each do |fiber_var_key| fiber_vars[fiber_var_key] = Thread.current[fiber_var_key] end fiber_vars end |
#lazy_at_depth(depth, lazy) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
246 247 248 |
# File 'lib/graphql/dataloader.rb', line 246 def lazy_at_depth(depth, lazy) @lazies_at_depth[depth] << lazy end |
#merge_records(records, index_by: :id) ⇒ void
This method returns an undefined value.
Pre-warm the Dataloader cache with ActiveRecord objects which were loaded elsewhere.
These will be used by ActiveRecordSource, ActiveRecordAssociationSource and their helper
methods, dataload_record
and dataload_association
.
265 266 267 268 269 270 271 272 273 |
# File 'lib/graphql/dataloader.rb', line 265 def merge_records(records, index_by: :id) records_by_class = Hash.new { |h, k| h[k] = {} } records.each do |r| records_by_class[r.class][r.public_send(index_by)] = r end records_by_class.each do |r_class, records| with(ActiveRecordSource, r_class).merge(records) end end |
#nonblocking? ⇒ Boolean
73 74 75 |
# File 'lib/graphql/dataloader.rb', line 73 def nonblocking? @nonblocking end |
#run(trace_query_lazy: nil) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/graphql/dataloader.rb', line 198 def run(trace_query_lazy: nil) trace = Fiber[:__graphql_current_multiplex]&.current_trace jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit job_fibers = [] next_job_fibers = [] source_fibers = [] next_source_fibers = [] first_pass = true manager = spawn_fiber do trace&.begin_dataloader(self) while first_pass || !job_fibers.empty? first_pass = false run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit) if !@lazies_at_depth.empty? with_trace_query_lazy(trace_query_lazy) do run_next_pending_lazies(job_fibers, trace) run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit) end end end trace&.end_dataloader(self) end run_fiber(manager) if manager.alive? raise "Invariant: Manager fiber didn't terminate properly." end if !job_fibers.empty? raise "Invariant: job fibers should have exited but #{job_fibers.size} remained" end if !source_fibers.empty? raise "Invariant: source fibers should have exited but #{source_fibers.size} remained" end rescue UncaughtThrowError => e throw e.tag, e.value end |
#run_fiber(f) ⇒ Object
241 242 243 |
# File 'lib/graphql/dataloader.rb', line 241 def run_fiber(f) f.resume end |
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/graphql/dataloader.rb', line 161 def run_isolated prev_queue = @pending_jobs prev_pending_keys = {} prev_lazies_at_depth = @lazies_at_depth @lazies_at_depth = @lazies_at_depth.dup.clear # Clear pending loads but keep already-cached records # in case they are useful to the given block. @source_cache.each do |source_class, batched_sources| batched_sources.each do |batch_args, batched_source_instance| if batched_source_instance.pending? prev_pending_keys[batched_source_instance] = batched_source_instance.pending.dup batched_source_instance.pending.clear end end end @pending_jobs = [] res = nil # Make sure the block is inside a Fiber, so it can `Fiber.yield` append_job { res = yield } run res ensure @pending_jobs = prev_queue @lazies_at_depth = prev_lazies_at_depth prev_pending_keys.each do |source_instance, pending| pending.each do |key, value| if !source_instance.results.key?(key) source_instance.pending[key] = value end end end end |
#set_fiber_variables(vars) ⇒ void
This method returns an undefined value.
Set up the fiber variables in a new fiber.
This is called within the fiber, right after it is spawned.
95 96 97 98 |
# File 'lib/graphql/dataloader.rb', line 95 def set_fiber_variables(vars) vars.each { |k, v| Thread.current[k] = v } nil end |
#spawn_fiber ⇒ Object
250 251 252 253 254 255 256 257 |
# File 'lib/graphql/dataloader.rb', line 250 def spawn_fiber fiber_vars = get_fiber_variables Fiber.new(blocking: !@nonblocking) { set_fiber_variables(fiber_vars) yield cleanup_fiber } end |
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn’t doing well with the implementation below
112 113 114 115 116 117 118 119 |
# File 'lib/graphql/dataloader.rb', line 112 def with(source_class, *batch_args) batch_key = source_class.batch_key_for(*batch_args) @source_cache[source_class][batch_key] ||= begin source = source_class.new(*batch_args) source.setup(self) source end end |
#yield(source = ) ⇒ void
This method returns an undefined value.
Tell the dataloader that this fiber is waiting for data.
Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
135 136 137 138 139 140 141 |
# File 'lib/graphql/dataloader.rb', line 135 def yield(source = Fiber[:__graphql_current_dataloader_source]) trace = Fiber[:__graphql_current_multiplex]&.current_trace trace&.dataloader_fiber_yield(source) Fiber.yield trace&.dataloader_fiber_resume(source) nil end |