Class: GraphQL::Dataloader
- Inherits:
-
Object
- Object
- GraphQL::Dataloader
- Defined in:
- lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb
Overview
This plugin supports Fiber-based concurrency, along with Source.
Direct Known Subclasses
Defined Under Namespace
Classes: NullDataloader, Request, RequestAll, Source
Class Method Summary collapse
Instance Method Summary collapse
-
#append_job(&job) ⇒ Object
-
#initialize ⇒ Dataloader
constructor
A new instance of Dataloader.
-
#run ⇒ Object
-
#with(source_class, *batch_parameters) ⇒ GraphQL::Dataloader::Source
Get a Source instance from this dataloader, for calling
.load(...)
or.request(...)
on. -
#yield ⇒ void
Tell the dataloader that this fiber is waiting for data.
Constructor Details
#initialize ⇒ Dataloader
Returns a new instance of Dataloader.
30 31 32 33 34 35 36 37 38 |
# File 'lib/graphql/dataloader.rb', line 30 def initialize @source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters| source = source_class.new(*batch_parameters) source.setup(self) h2[batch_parameters] = source } } @pending_jobs = [] end |
Class Method Details
.use(schema) ⇒ Object
26 27 28 |
# File 'lib/graphql/dataloader.rb', line 26 def self.use(schema) schema.dataloader_class = self end |
Instance Method Details
#append_job(&job) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/graphql/dataloader.rb', line 61 def append_job(&job) # Given a block, queue it up to be worked through when `#run` is called. # (If the dataloader is already running, than a Fiber will pick this up later.) @pending_jobs.push(job) nil end |
#run ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/graphql/dataloader.rb', line 69 def run # At a high level, the algorithm is: # # A) Inside Fibers, run jobs from the queue one-by-one # - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause # - In that case, if there are still pending jobs, a new Fiber will be created to run jobs # - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded) # B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources. # - Similarly, create a Fiber to consume pending sources and tell them to load their data. # - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources. # - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one. # C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data. # - Those Fibers assume that source caches will have been populated with the data they were waiting for. # - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list. # D) Once all pending fibers have been resumed once, return to `A` above. # # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D` # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read. # pending_fibers = [] next_fibers = [] first_pass = true while first_pass || (f = pending_fibers.shift) if first_pass first_pass = false else # These fibers were previously waiting for sources to load data, # resume them. (They might wait again, in which case, re-enqueue them.) resume(f) if f.alive? next_fibers << f end end while @pending_jobs.any? # Create a Fiber to consume jobs until one of the jobs yields # or jobs run out f = Fiber.new { while (job = @pending_jobs.shift) job.call end } resume(f) # In this case, the job yielded. Queue it up to run again after # we load whatever it's waiting for. if f.alive? next_fibers << f end end if pending_fibers.empty? # Now, run all Sources which have become pending _before_ resuming GraphQL execution. # Sources might queue up other Sources, which is fine -- those will also run before resuming execution. # # This is where an evented approach would be even better -- can we tell which # fibers are ready to continue, and continue execution there? # source_fiber_stack = if (first_source_fiber = create_source_fiber) [first_source_fiber] else nil end if source_fiber_stack # Use a stack with `.pop` here so that when a source causes another source to become pending, # that newly-pending source will run _before_ the one that depends on it. # (See below where the old fiber is pushed to the stack, then the new fiber is pushed on the stack.) while (outer_source_fiber = source_fiber_stack.pop) resume(outer_source_fiber) if outer_source_fiber.alive? source_fiber_stack << outer_source_fiber end # If this source caused more sources to become pending, run those before running this one again: next_source_fiber = create_source_fiber if next_source_fiber source_fiber_stack << next_source_fiber end end end # Move newly-enqueued Fibers on to the list to be resumed. # Clear out the list of next-round Fibers, so that # any Fibers that pause can be put on it. pending_fibers.concat(next_fibers) next_fibers.clear end end if @pending_jobs.any? raise "Invariant: #{@pending_jobs.size} pending jobs" elsif pending_fibers.any? raise "Invariant: #{pending_fibers.size} pending fibers" elsif next_fibers.any? raise "Invariant: #{next_fibers.size} next fibers" end nil end |
#with(source_class, *batch_parameters) ⇒ GraphQL::Dataloader::Source
Get a Source instance from this dataloader, for calling .load(...)
or .request(...)
on.
46 47 48 |
# File 'lib/graphql/dataloader.rb', line 46 def with(source_class, *batch_parameters) @source_cache[source_class][batch_parameters] end |
#yield ⇒ void
This method returns an undefined value.
Tell the dataloader that this fiber is waiting for data.
Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
55 56 57 58 |
# File 'lib/graphql/dataloader.rb', line 55 def yield Fiber.yield nil end |