Class: GraphQL::Subscriptions::ActionCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Defined in:
lib/graphql/subscriptions/action_cable_subscriptions.rb

Overview

A subscriptions implementation that sends data as ActionCable broadcastings.

Experimental, some things to keep in mind:

  • No queueing system; ActiveJob should be added
  • Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
  • Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.

Examples:

Adding ActionCableSubscriptions to your schema

class MySchema < GraphQL::Schema
  # ...
  use GraphQL::Subscriptions::ActionCableSubscriptions
end

Implementing a channel for GraphQL Subscriptions

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      # Re-implement whatever context methods you need
      # in this channel or ApplicationCable::Channel
      # current_user: current_user,
      # Make sure the channel is in the context
      channel: self,
    }

    result = MySchema.execute({
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    })

    payload = {
      result: result.to_h,
      more: result.subscription?,
    }

    # Track the subscription here so we can remove it
    # on unsubscribe.
    if result.context[:subscription_id]
      @subscription_ids << result.context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each { |sid|
      MySchema.subscriptions.delete_subscription(sid)
    }
  end

  private

    def ensure_hash(ambiguous_param)
      case ambiguous_param
      when String
        if ambiguous_param.present?
          ensure_hash(JSON.parse(ambiguous_param))
        else
          {}
        end
      when Hash, ActionController::Parameters
        ambiguous_param
      when nil
        {}
      else
        raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
      end
    end
end

Constant Summary collapse

SUBSCRIPTION_PREFIX =
"graphql-subscription:"
EVENT_PREFIX =
"graphql-event:"

Instance Attribute Summary

Attributes inherited from GraphQL::Subscriptions

#default_broadcastable

Instance Method Summary collapse

Methods inherited from GraphQL::Subscriptions

#broadcastable?, #build_id, #each_subscription_id, #execute, #execute_update, #normalize_name, #trigger, use

Constructor Details

#initialize(serializer: Serialize, **rest) ⇒ ActionCableSubscriptions

Returns a new instance of ActionCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to .broadcast(msg)



89
90
91
92
93
94
95
96
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 89

def initialize(serializer: Serialize, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } }
  @serializer = serializer
  super
end

Instance Method Details

#delete_subscription(subscription_id) ⇒ Object

The channel was closed, forget about it.



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 177

def delete_subscription(subscription_id)
  query = @subscriptions.delete(subscription_id)
  events = query.context.namespace(:subscriptions)[:events]
  events.each do |event|
    ev_by_fingerprint = @events[event.topic]
    ev_for_fingerprint = ev_by_fingerprint[event.fingerprint]
    ev_for_fingerprint.delete(event)
    if ev_for_fingerprint.empty?
      ev_by_fingerprint.delete(event.fingerprint)
    end
  end
end

#deliver(subscription_id, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.



108
109
110
111
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 108

def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  ActionCable.server.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload)
end

#execute_all(event, object) ⇒ Object

An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.



100
101
102
103
104
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 100

def execute_all(event, object)
  stream = EVENT_PREFIX + event.topic
  message = @serializer.dump(object)
  ActionCable.server.broadcast(stream, message)
end

#read_subscription(subscription_id) ⇒ Object

Return the query from “storage” (in memory)



166
167
168
169
170
171
172
173
174
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 166

def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: query.context.to_h,
    operation_name: query.operation_name,
  }
end

#setup_stream(channel, initial_event) ⇒ Object

Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.

But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.

To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 142

def setup_stream(channel, initial_event)
  topic = initial_event.topic
  channel.stream_from(EVENT_PREFIX + topic, coder: ActiveSupport::JSON) do |message|
    object = @serializer.load(message)
    events_by_fingerprint = @events[topic]
    events_by_fingerprint.each do |_fingerprint, events|
      if events.any? && events.first == initial_event
        # The fingerprint has told us that this response should be shared by all subscribers,
        # so just run it once, then deliver the result to every subscriber
        first_event = events.first
        first_subscription_id = first_event.context.fetch(:subscription_id)
        result = execute_update(first_subscription_id, first_event, object)
        # Having calculated the result _once_, send the same payload to all subscribers
        events.each do |event|
          subscription_id = event.context.fetch(:subscription_id)
          deliver(subscription_id, result)
        end
      end
    end
    nil
  end
end

#write_subscription(query, events) ⇒ Object

A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 117

def write_subscription(query, events)
  channel = query.context.fetch(:channel)
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = query.context[:action_cable_stream] ||= SUBSCRIPTION_PREFIX + subscription_id
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    # Setup a new listener to run all events with this topic in this process
    setup_stream(channel, event)
    # Add this event to the list of events to be updated
    @events[event.topic][event.fingerprint] << event
  end
end