Class: GraphQL::Dataloader

Inherits:
Object
  • Object
show all
Defined in:
lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb,
lib/graphql/dataloader/async_dataloader.rb,
lib/graphql/dataloader/active_record_source.rb,
lib/graphql/dataloader/active_record_association_source.rb

Overview

This plugin supports Fiber-based concurrency, along with Source.

Examples:

Installing Dataloader


class MySchema < GraphQL::Schema
  use GraphQL::Dataloader
end

Waiting for batch-loaded data in a GraphQL field


field :team, Types::Team, null: true

def team
  dataloader.with(Sources::Record, Team).load(object.team_id)
end

Direct Known Subclasses

AsyncDataloader, NullDataloader

Defined Under Namespace

Classes: ActiveRecordAssociationSource, ActiveRecordSource, AsyncDataloader, NullDataloader, Request, RequestAll, Source

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit) ⇒ Dataloader

Returns a new instance of Dataloader.



60
61
62
63
64
65
66
67
# File 'lib/graphql/dataloader.rb', line 60

def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit)
  @source_cache = Hash.new { |h, k| h[k] = {} }
  @pending_jobs = []
  if !nonblocking.nil?
    @nonblocking = nonblocking
  end
  @fiber_limit = fiber_limit
end

Class Attribute Details

.default_fiber_limitObject

Returns the value of attribute default_fiber_limit.



29
30
31
# File 'lib/graphql/dataloader.rb', line 29

def default_fiber_limit
  @default_fiber_limit
end

.default_nonblockingObject

Returns the value of attribute default_nonblocking.



29
30
31
# File 'lib/graphql/dataloader.rb', line 29

def default_nonblocking
  @default_nonblocking
end

Instance Attribute Details

#fiber_limitInteger? (readonly)

Returns:

  • (Integer, nil)


70
71
72
# File 'lib/graphql/dataloader.rb', line 70

def fiber_limit
  @fiber_limit
end

Class Method Details

.use(schema, nonblocking: nil, fiber_limit: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/graphql/dataloader.rb', line 32

def self.use(schema, nonblocking: nil, fiber_limit: nil)
  dataloader_class = if nonblocking
    warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.")
    Class.new(self) { self.default_nonblocking = true }
  else
    self
  end

  if fiber_limit
    dataloader_class = Class.new(dataloader_class)
    dataloader_class.default_fiber_limit = fiber_limit
  end

  schema.dataloader_class = dataloader_class
end

.with_dataloading(&block) ⇒ Object

Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.



50
51
52
53
54
55
56
57
58
# File 'lib/graphql/dataloader.rb', line 50

def self.with_dataloading(&block)
  dataloader = self.new
  result = nil
  dataloader.append_job {
    result = block.call(dataloader)
  }
  dataloader.run
  result
end

Instance Method Details

#append_job(&job) ⇒ Object



143
144
145
146
147
148
# File 'lib/graphql/dataloader.rb', line 143

def append_job(&job)
  # Given a block, queue it up to be worked through when `#run` is called.
  # (If the dataloader is already running, than a Fiber will pick this up later.)
  @pending_jobs.push(job)
  nil
end

#cleanup_fiberObject

This method is called when Dataloader is finished using a fiber. Use it to perform any cleanup, such as releasing database connections (if required manually)



101
102
# File 'lib/graphql/dataloader.rb', line 101

def cleanup_fiber
end

#clear_cachevoid

This method returns an undefined value.

Clear any already-loaded objects from Source caches



152
153
154
155
156
157
# File 'lib/graphql/dataloader.rb', line 152

def clear_cache
  @source_cache.each do |_source_class, batched_sources|
    batched_sources.each_value(&:clear_cache)
  end
  nil
end

#get_fiber_variablesHash<Symbol, Object>

This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).

Returns:

  • (Hash<Symbol, Object>)

    Current fiber-local variables



80
81
82
83
84
85
86
# File 'lib/graphql/dataloader.rb', line 80

def get_fiber_variables
  fiber_vars = {}
  Thread.current.keys.each do |fiber_var_key|
    fiber_vars[fiber_var_key] = Thread.current[fiber_var_key]
  end
  fiber_vars
end

#merge_records(records, index_by: :id) ⇒ void

This method returns an undefined value.

Pre-warm the Dataloader cache with ActiveRecord objects which were loaded elsewhere. These will be used by ActiveRecordSource, ActiveRecordAssociationSource and their helper methods, dataload_record and dataload_association.

Parameters:

  • records (Array<ActiveRecord::Base>)

    Already-loaded records to warm the cache with

  • index_by (Symbol) (defaults to: :id)

    The attribute to use as the cache key. (Should match find_by: when using ActiveRecordSource)



266
267
268
269
270
271
272
273
274
# File 'lib/graphql/dataloader.rb', line 266

def merge_records(records, index_by: :id)
  records_by_class = Hash.new { |h, k| h[k] = {} }
  records.each do |r|
    records_by_class[r.class][r.public_send(index_by)] = r
  end
  records_by_class.each do |r_class, records|
    with(ActiveRecordSource, r_class).merge(records)
  end
end

#nonblocking?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/graphql/dataloader.rb', line 72

def nonblocking?
  @nonblocking
end

#runObject



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/graphql/dataloader.rb', line 191

def run
  trace = Fiber[:__graphql_current_multiplex]&.current_trace
  jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
  job_fibers = []
  next_job_fibers = []
  source_fibers = []
  next_source_fibers = []
  first_pass = true
  manager = spawn_fiber do
    trace&.begin_dataloader(self)
    while first_pass || !job_fibers.empty?
      first_pass = false

      while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
        if f.alive?
          finished = run_fiber(f)
          if !finished
            next_job_fibers << f
          end
        end
      end
      join_queues(job_fibers, next_job_fibers)

      while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
        while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
          if f.alive?
            finished = run_fiber(f)
            if !finished
              next_source_fibers << f
            end
          end
        end
        join_queues(source_fibers, next_source_fibers)
      end
    end

    trace&.end_dataloader(self)
  end

  run_fiber(manager)

  if manager.alive?
    raise "Invariant: Manager fiber didn't terminate properly."
  end

  if !job_fibers.empty?
    raise "Invariant: job fibers should have exited but #{job_fibers.size} remained"
  end
  if !source_fibers.empty?
    raise "Invariant: source fibers should have exited but #{source_fibers.size} remained"
  end

rescue UncaughtThrowError => e
  throw e.tag, e.value
end

#run_fiber(f) ⇒ Object



247
248
249
# File 'lib/graphql/dataloader.rb', line 247

def run_fiber(f)
  f.resume
end

#run_isolatedObject

Use a self-contained queue for the work in the block.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/graphql/dataloader.rb', line 160

def run_isolated
  prev_queue = @pending_jobs
  prev_pending_keys = {}
  @source_cache.each do |source_class, batched_sources|
    batched_sources.each do |batch_args, batched_source_instance|
      if batched_source_instance.pending?
        prev_pending_keys[batched_source_instance] = batched_source_instance.pending.dup
        batched_source_instance.pending.clear
      end
    end
  end

  @pending_jobs = []
  res = nil
  # Make sure the block is inside a Fiber, so it can `Fiber.yield`
  append_job {
    res = yield
  }
  run
  res
ensure
  @pending_jobs = prev_queue
  prev_pending_keys.each do |source_instance, pending|
    pending.each do |key, value|
      if !source_instance.results.key?(key)
        source_instance.pending[key] = value
      end
    end
  end
end

#set_fiber_variables(vars) ⇒ void

This method returns an undefined value.

Set up the fiber variables in a new fiber.

This is called within the fiber, right after it is spawned.

Parameters:



94
95
96
97
# File 'lib/graphql/dataloader.rb', line 94

def set_fiber_variables(vars)
  vars.each { |k, v| Thread.current[k] = v }
  nil
end

#spawn_fiberObject



251
252
253
254
255
256
257
258
# File 'lib/graphql/dataloader.rb', line 251

def spawn_fiber
  fiber_vars = get_fiber_variables
  Fiber.new(blocking: !@nonblocking) {
    set_fiber_variables(fiber_vars)
    yield
    cleanup_fiber
  }
end

#with(source_class, *batch_args, **batch_kwargs) ⇒ Object

truffle-ruby wasn’t doing well with the implementation below



111
112
113
114
115
116
117
118
# File 'lib/graphql/dataloader.rb', line 111

def with(source_class, *batch_args)
  batch_key = source_class.batch_key_for(*batch_args)
  @source_cache[source_class][batch_key] ||= begin
    source = source_class.new(*batch_args)
    source.setup(self)
    source
  end
end

#yield(source = ) ⇒ void

This method returns an undefined value.

Tell the dataloader that this fiber is waiting for data.

Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).



134
135
136
137
138
139
140
# File 'lib/graphql/dataloader.rb', line 134

def yield(source = Fiber[:__graphql_current_dataloader_source])
  trace = Fiber[:__graphql_current_multiplex]&.current_trace
  trace&.dataloader_fiber_yield(source)
  Fiber.yield
  trace&.dataloader_fiber_resume(source)
  nil
end