Class: GraphQL::Subscriptions::ActionCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::ActionCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/action_cable_subscriptions.rb
Overview
A subscriptions implementation that sends data as ActionCable broadcastings.
Some things to keep in mind:
- No queueing system; ActiveJob should be added
- Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"
- EVENT_PREFIX =
"graphql-event:"
Instance Attribute Summary
Attributes inherited from GraphQL::Subscriptions
Instance Method Summary collapse
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable.
-
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
constructor
A new instance of ActionCableSubscriptions.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory).
-
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action.
-
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to.
Methods inherited from GraphQL::Subscriptions
#broadcastable?, #build_id, #each_subscription_id, #execute, #execute_update, #normalize_name, #trigger, use
Constructor Details
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
Returns a new instance of ActionCableSubscriptions.
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 90 def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } } @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @transmit_ns = namespace super end |
Instance Method Details
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 188 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
112 113 114 115 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 112 def deliver(subscription_id, result) payload = { result: result.to_h, more: true } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end |
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
104 105 106 107 108 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 104 def execute_all(event, object) stream = stream_event_name(event) = @serializer.dump(object) @action_cable.server.broadcast(stream, ) end |
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory)
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 170 def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end |
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 146 def setup_stream(channel, initial_event) topic = initial_event.topic channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do || object = @serializer.load() events_by_fingerprint = @events[topic] events_by_fingerprint.each do |_fingerprint, events| if events.any? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) result = execute_update(first_subscription_id, first_event, object) # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end nil end end |
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 121 def write_subscription(query, events) channel = query.context.fetch(:channel) subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end |