Class: GraphQL::Subscriptions::ActionCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::ActionCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/action_cable_subscriptions.rb
Overview
A subscriptions implementation that sends data as ActionCable broadcastings.
Some things to keep in mind:
- No queueing system; ActiveJob should be added
- Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"- EVENT_PREFIX =
"graphql-event:"
Instance Attribute Summary
Attributes inherited from GraphQL::Subscriptions
Instance Method Summary collapse
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable.
-
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
constructor
A new instance of ActionCableSubscriptions.
-
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory).
-
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action.
-
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to.
Methods inherited from GraphQL::Subscriptions
#broadcastable?, #build_id, #execute, #execute_update, #normalize_name, #trigger, use, #validate_update?
Constructor Details
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
Returns a new instance of ActionCableSubscriptions.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 91 def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new do |h, k| h.compute_if_absent(k) do Concurrent::Map.new do |h2, k2| h2.compute_if_absent(k2) { Concurrent::Array.new } end end end @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @serialize_with_context = case @serializer.method(:load).arity when 1 false when 2 true else raise ArgumentError, "#{@serializer} must respond to `.load` accepting one or two arguments" end @transmit_ns = namespace super end |
Instance Method Details
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 226 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # In case this came from the server, tell the client to unsubscribe: @action_cable.server.broadcast(stream_subscription_name(subscription_id), { more: false }) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
127 128 129 130 131 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 127 def deliver(subscription_id, result) has_more = !result.context.namespace(:subscriptions)[:final_update] payload = { result: result.to_h, more: has_more } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end |
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
119 120 121 122 123 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 119 def execute_all(event, object) stream = stream_event_name(event) = @serializer.dump(object) @action_cable.server.broadcast(stream, ) end |
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
199 200 201 202 203 204 205 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 199 def (, context) if @serialize_with_context @serializer.load(, context) else @serializer.load() end end |
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in memory)
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 208 def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end |
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 168 def setup_stream(channel, initial_event) topic = initial_event.topic event_stream = stream_event_name(initial_event) channel.stream_from(event_stream, coder: @action_cable_coder) do || events_by_fingerprint = @events[topic] object = nil events_by_fingerprint.each do |_fingerprint, events| if !events.empty? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) object ||= (, first_event.context) result = execute_update(first_subscription_id, first_event, object) if !result.nil? # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end end nil end end |
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 137 def write_subscription(query, events) unless (channel = query.context[:channel]) raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\ "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\ "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\ "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\ "GraphiQL via `graphiql-rails` may not work out of box (#1051)." end subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end |