Class: GraphQL::Dataloader

Inherits:
Object
  • Object
show all
Defined in:
lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb

Overview

This plugin supports Fiber-based concurrency, along with Source.

Examples:

Installing Dataloader


class MySchema < GraphQL::Schema
  use GraphQL::Dataloader
end

Waiting for batch-loaded data in a GraphQL field


field :team, Types::Team, null: true

def team
  dataloader.with(Sources::Record, Team).load(object.team_id)
end

Direct Known Subclasses

NullDataloader

Defined Under Namespace

Classes: NullDataloader, Request, RequestAll, Source

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDataloader

Returns a new instance of Dataloader.



30
31
32
33
34
35
36
37
38
# File 'lib/graphql/dataloader.rb', line 30

def initialize
  @source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters|
      source = source_class.new(*batch_parameters)
      source.setup(self)
      h2[batch_parameters] = source
    }
  }
  @pending_jobs = []
end

Class Method Details

.use(schema) ⇒ Object



26
27
28
# File 'lib/graphql/dataloader.rb', line 26

def self.use(schema)
  schema.dataloader_class = self
end

Instance Method Details

#append_job(&job) ⇒ Object



61
62
63
64
65
66
# File 'lib/graphql/dataloader.rb', line 61

def append_job(&job)
  # Given a block, queue it up to be worked through when `#run` is called.
  # (If the dataloader is already running, than a Fiber will pick this up later.)
  @pending_jobs.push(job)
  nil
end

#runObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/graphql/dataloader.rb', line 69

def run
  # At a high level, the algorithm is:
  #
  #  A) Inside Fibers, run jobs from the queue one-by-one
  #    - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
  #    - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
  #    - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
  #  B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
  #    - Similarly, create a Fiber to consume pending sources and tell them to load their data.
  #    - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
  #    - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
  #  C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
  #    - Those Fibers assume that source caches will have been populated with the data they were waiting for.
  #    - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
  #  D) Once all pending fibers have been resumed once, return to `A` above.
  #
  # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
  # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
  #
  pending_fibers = []
  next_fibers = []
  first_pass = true

  while first_pass || (f = pending_fibers.shift)
    if first_pass
      first_pass = false
    else
      # These fibers were previously waiting for sources to load data,
      # resume them. (They might wait again, in which case, re-enqueue them.)
      resume(f)
      if f.alive?
        next_fibers << f
      end
    end

    while @pending_jobs.any?
      # Create a Fiber to consume jobs until one of the jobs yields
      # or jobs run out
      f = Fiber.new {
        while (job = @pending_jobs.shift)
          job.call
        end
      }
      resume(f)
      # In this case, the job yielded. Queue it up to run again after
      # we load whatever it's waiting for.
      if f.alive?
        next_fibers << f
      end
    end

    if pending_fibers.empty?
      # Now, run all Sources which have become pending _before_ resuming GraphQL execution.
      # Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
      #
      # This is where an evented approach would be even better -- can we tell which
      # fibers are ready to continue, and continue execution there?
      #
      source_fiber_stack = if (first_source_fiber = create_source_fiber)
        [first_source_fiber]
      else
        nil
      end

      if source_fiber_stack
        # Use a stack with `.pop` here so that when a source causes another source to become pending,
        # that newly-pending source will run _before_ the one that depends on it.
        # (See below where the old fiber is pushed to the stack, then the new fiber is pushed on the stack.)
        while (outer_source_fiber = source_fiber_stack.pop)
          resume(outer_source_fiber)

          if outer_source_fiber.alive?
            source_fiber_stack << outer_source_fiber
          end
          # If this source caused more sources to become pending, run those before running this one again:
          next_source_fiber = create_source_fiber
          if next_source_fiber
            source_fiber_stack << next_source_fiber
          end
        end
      end
      # Move newly-enqueued Fibers on to the list to be resumed.
      # Clear out the list of next-round Fibers, so that
      # any Fibers that pause can be put on it.
      pending_fibers.concat(next_fibers)
      next_fibers.clear
    end
  end

  if @pending_jobs.any?
    raise "Invariant: #{@pending_jobs.size} pending jobs"
  elsif pending_fibers.any?
    raise "Invariant: #{pending_fibers.size} pending fibers"
  elsif next_fibers.any?
    raise "Invariant: #{next_fibers.size} next fibers"
  end
  nil
end

#with(source_class, *batch_parameters) ⇒ GraphQL::Dataloader::Source

Get a Source instance from this dataloader, for calling .load(...) or .request(...) on.

Parameters:

  • source_class (Class<GraphQL::Dataloader::Source])

    ource_class [Class<GraphQL::Dataloader::Source]

  • batch_parameters (Array<Object>)

Returns:

  • (GraphQL::Dataloader::Source)

    An instance of source_class, initialized with self, *batch_parameters, and cached for the lifetime of this Multiplex.



46
47
48
# File 'lib/graphql/dataloader.rb', line 46

def with(source_class, *batch_parameters)
  @source_cache[source_class][batch_parameters]
end

#yieldvoid

This method returns an undefined value.

Tell the dataloader that this fiber is waiting for data.

Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).



55
56
57
58
# File 'lib/graphql/dataloader.rb', line 55

def yield
  Fiber.yield
  nil
end