Class: GraphQL::Subscriptions::ActionCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::ActionCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/action_cable_subscriptions.rb
Overview
A subscriptions implementation that sends data as ActionCable broadcastings.
Some things to keep in mind:
- No queueing system; ActiveJob should be added
- Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"
- EVENT_PREFIX =
"graphql-event:"
Instance Attribute Summary
Attributes inherited from GraphQL::Subscriptions
Instance Method Summary collapse
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable.
-
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
constructor
A new instance of ActionCableSubscriptions.
-
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory).
-
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action.
-
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to.
Methods inherited from GraphQL::Subscriptions
#broadcastable?, #build_id, #each_subscription_id, #execute, #execute_update, #normalize_name, #trigger, use
Constructor Details
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
Returns a new instance of ActionCableSubscriptions.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 90 def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } } @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @serialize_with_context = case @serializer.method(:load).arity when 1 false when 2 true else raise ArgumentError, "#{@serializer} must repond to `.load` accepting one or two arguments" end @transmit_ns = namespace super end |
Instance Method Details
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 215 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
120 121 122 123 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 120 def deliver(subscription_id, result) payload = { result: result.to_h, more: true } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end |
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
112 113 114 115 116 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 112 def execute_all(event, object) stream = stream_event_name(event) = @serializer.dump(object) @action_cable.server.broadcast(stream, ) end |
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
188 189 190 191 192 193 194 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 188 def (, context) if @serialize_with_context @serializer.load(, context) else @serializer.load() end end |
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 197 def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end |
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 160 def setup_stream(channel, initial_event) topic = initial_event.topic channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do || events_by_fingerprint = @events[topic] object = nil events_by_fingerprint.each do |_fingerprint, events| if events.any? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) object ||= (, first_event.context) result = execute_update(first_subscription_id, first_event, object) # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end nil end end |
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 129 def write_subscription(query, events) unless (channel = query.context[:channel]) raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\ "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\ "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\ "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\ "GraphiQL via `graphiql-rails` may not work out of box (#1051)." end subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end |