Module: Y::Actioncable::Sync
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/y/actioncable/sync.rb
Overview
A Sync module for Rails ActionCable channels.
This module contains a set of utility methods that allows a relatively
convenient implementation of a real-time sync channel. The module
implements the synchronization steps described in
y-protocols/sync
.
Instance Method Summary collapse
-
#canonical_channel_key ⇒ Object
Produce a canonical key for this channel and its parameters.
-
#doc ⇒ Y::Doc
Creates the document once.
-
#initiate ⇒ Object
Initiate synchronization.
-
#integrate(message, field: FIELD_UPDATE) ⇒ Object
This methods should be passed as a block to stream subscription, and not be put into a generic #receive method.
-
#load {|id| ... } ⇒ Y::Doc
Load the current state of a document from an external source and returns a reference to the document.
-
#persist {|id, update| ... } ⇒ Object
Persist the current document state to an external store.
-
#sync(broadcasting, message, field: FIELD_UPDATE) ⇒ Object
Synchronize update with all other connected clients (and server processes).
-
#sync_for(model) {|id, update| ... } ⇒ Object
Sync for given model.
-
#sync_from(broadcasting) {|id, update| ... } ⇒ Object
Sync for given stream.
-
#sync_to(to, message, field: FIELD_UPDATE) ⇒ Object
Synchronize update with all other connected clients (and server processes).
Instance Method Details
#canonical_channel_key ⇒ Object
Produce a canonical key for this channel and its parameters. This allows us to create unique documents for separate use cases. e.g. an Issue is the document scope, but has multiple fields that are synchronized, the title, description, labels, …
By default, the key is the same as the channel identifier.
201 202 203 204 205 206 207 208 209 |
# File 'lib/y/actioncable/sync.rb', line 201 def canonical_channel_key @canonical_channel_key ||= begin params_part = channel_identifier.map do |k, v| "#{k.to_s.parameterize}-#{v.to_s.parameterize}" end "#{CHANNEL_PREFIX}:#{params_part.join(":")}" end end |
#doc ⇒ Y::Doc
Creates the document once.
This method can be overriden in case the document should be initialized with any state other than an empty one. In conjunction with load, this allows to provide a document to clients that is restored from a persistent store like Redis or also an ActiveRecord model.
260 261 262 |
# File 'lib/y/actioncable/sync.rb', line 260 def doc @doc ||= Y::Doc.new end |
#initiate ⇒ Object
Initiate synchronization. Encodes the current state_vector and transmits to the connecting client.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/y/actioncable/sync.rb', line 42 def initiate encoder = Y::Lib0::Encoding.create_encoder Y::Lib0::Encoding.write_var_uint(encoder, MESSAGE_SYNC) Y::Sync.write_sync_step1(encoder, doc) update = Y::Lib0::Encoding.to_uint8_array(encoder) update = Y::Lib0::Encoding.encode_uint8_array_to_base64(update) transmit({ update: update }) # TODO: implement awareness https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L278-L284 end |
#integrate(message, field: FIELD_UPDATE) ⇒ Object
This methods should be passed as a block to stream subscription, and not be put into a generic #receive method.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/y/actioncable/sync.rb', line 61 def integrate(, field: FIELD_UPDATE) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength origin = [FIELD_ORIGIN] update = Y::Lib0::Decoding.decode_base64_to_uint8_array([field]) encoder = Y::Lib0::Encoding.create_encoder decoder = Y::Lib0::Decoding.create_decoder(update) = Y::Lib0::Decoding.read_var_uint(decoder) case when MESSAGE_SYNC Y::Lib0::Encoding.write_var_uint(encoder, MESSAGE_SYNC) Y::Sync.(decoder, encoder, doc, nil) # If the `encoder` only contains the type of reply message and no # message, there is no need to send the message. When `encoder` only # contains the type of reply, its length is 1. if Y::Lib0::Encoding.length(encoder) > 1 update = Y::Lib0::Encoding.to_uint8_array(encoder) update = Y::Lib0::Encoding.encode_uint8_array_to_base64(update) transmit({ update: update }) end when MESSAGE_AWARENESS # TODO: implement awareness https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L179-L181 else raise "unexpected message_type=`#{}`" end # do not transmit message back to current connection if the connection # is the origin of the message connection_identifier = connection.connection_identifier return if connection_identifier.present? && origin == connection_identifier transmit() end |
#load {|id| ... } ⇒ Y::Doc
Load the current state of a document from an external source and returns a reference to the document.
for block { |id| … }
220 221 222 223 224 225 226 |
# File 'lib/y/actioncable/sync.rb', line 220 def load(&block) full_diff = nil full_diff = yield(canonical_channel_key) if block @doc ||= Y::Doc.new @doc.sync(full_diff) unless full_diff.nil? @doc end |
#persist {|id, update| ... } ⇒ Object
Persist the current document state to an external store.
for block { |id, update| … }
236 237 238 |
# File 'lib/y/actioncable/sync.rb', line 236 def persist(&block) yield(canonical_channel_key, doc.diff) if block end |
#sync(broadcasting, message, field: FIELD_UPDATE) ⇒ Object
Synchronize update with all other connected clients (and server processes).
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/y/actioncable/sync.rb', line 150 def sync(broadcasting, , field: FIELD_UPDATE) update = [field] # we broadcast to all connected clients, but provide the # connection_identifier as origin so that the [#integrate] method is # able to filter sending back the update to its origin. ActionCable.server.broadcast( broadcasting, { update: update, origin: connection.connection_identifier } ) end |
#sync_for(model) {|id, update| ... } ⇒ Object
Sync for given model. This is a utility method that simplifies the setup of a sync channel.
for block { |id, update| … }
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/y/actioncable/sync.rb', line 107 def sync_for(model, &block) stream_for(model, coder: ActiveSupport::JSON) do || # integrate updates in the y-rb document integrate() # persist document persist(&block) if block end # negotiate initial state with client initiate end |
#sync_from(broadcasting) {|id, update| ... } ⇒ Object
Sync for given stream. This is a utility method that simplifies the setup of a sync channel.
for block { |id, update| … }
131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/y/actioncable/sync.rb', line 131 def sync_from(broadcasting, &block) stream_from(broadcasting, coder: ActiveSupport::JSON) do || # integrate updates in the y-rb document integrate() # persist document persist(&block) if block end # negotiate initial state with client initiate end |
#sync_to(to, message, field: FIELD_UPDATE) ⇒ Object
Synchronize update with all other connected clients (and server processes).
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/y/actioncable/sync.rb', line 168 def sync_to(to, , field: FIELD_UPDATE) update = [field] # we broadcast to all connected clients, but provide the # connection_identifier as origin so that the [#integrate] method is # able to filter sending back the update to its origin. self.class.broadcast_to( to, { update: update, origin: connection.connection_identifier } ) end |