Class: GraphQL::Subscriptions::ActionCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::ActionCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/action_cable_subscriptions.rb
Overview
A subscriptions implementation that sends data as ActionCable broadcastings.
Experimental, some things to keep in mind:
- No queueing system; ActiveJob should be added
- Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"
- EVENT_PREFIX =
"graphql-event:"
Instance Attribute Summary
Attributes inherited from GraphQL::Subscriptions
Instance Method Summary collapse
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable.
-
#initialize(serializer: Serialize, **rest) ⇒ ActionCableSubscriptions
constructor
A new instance of ActionCableSubscriptions.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory).
-
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action.
-
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to.
Methods inherited from GraphQL::Subscriptions
#broadcastable?, #build_id, #each_subscription_id, #execute, #execute_update, #normalize_name, #trigger, use
Constructor Details
#initialize(serializer: Serialize, **rest) ⇒ ActionCableSubscriptions
Returns a new instance of ActionCableSubscriptions.
89 90 91 92 93 94 95 96 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 89 def initialize(serializer: Serialize, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } } @serializer = serializer super end |
Instance Method Details
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 177 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
108 109 110 111 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 108 def deliver(subscription_id, result) payload = { result: result.to_h, more: true } ActionCable.server.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload) end |
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
100 101 102 103 104 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 100 def execute_all(event, object) stream = EVENT_PREFIX + event.topic = @serializer.dump(object) ActionCable.server.broadcast(stream, ) end |
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory)
166 167 168 169 170 171 172 173 174 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 166 def read_subscription(subscription_id) query = @subscriptions[subscription_id] { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end |
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 142 def setup_stream(channel, initial_event) topic = initial_event.topic channel.stream_from(EVENT_PREFIX + topic, coder: ActiveSupport::JSON) do || object = @serializer.load() events_by_fingerprint = @events[topic] events_by_fingerprint.each do |_fingerprint, events| if events.any? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) result = execute_update(first_subscription_id, first_event, object) # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end nil end end |
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 117 def write_subscription(query, events) channel = query.context.fetch(:channel) subscription_id = query.context[:subscription_id] ||= build_id stream = query.context[:action_cable_stream] ||= SUBSCRIPTION_PREFIX + subscription_id channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end |