Module: Y::Actioncable::Sync

Extended by:
ActiveSupport::Concern
Defined in:
lib/y/actioncable/sync.rb

Overview

A Sync module for Rails ActionCable channels.

This module contains a set of utility methods that allows a relatively convenient implementation of a real-time sync channel. The module implements the synchronization steps described in y-protocols/sync.

Examples:

Create a SyncChannel including this module

class SyncChannel
  def subscribed
    # initiate sync & subscribe to updates, with optional persistence mechanism
    sync_for(session) { |id, update| save_doc(id, update) }
  end

  def receive(message)
    # broadcast update to all connected clients on all servers
    sync_to(session, message)
  end
end

Instance Method Summary collapse

Instance Method Details

#canonical_channel_keyObject

Produce a canonical key for this channel and its parameters. This allows us to create unique documents for separate use cases. e.g. an Issue is the document scope, but has multiple fields that are synchronized, the title, description, labels, …

By default, the key is the same as the channel identifier.

Examples:

Create a new IssueChannel that sync updates for issue ID

# issue_channel.rb
class IssueChannel
  include Y::Actionable::SyncChannel
end

# issue_subscription.js
const params = { id: 1 }
consumer.subscriptions.create(
    {channel: "IssueChannel", ...params}
);

# example for a resulting canonical key
"issue_channel:id:1"


201
202
203
204
205
206
207
208
209
# File 'lib/y/actioncable/sync.rb', line 201

def canonical_channel_key
  @canonical_channel_key ||= begin
    params_part = channel_identifier.map do |k, v|
      "#{k.to_s.parameterize}-#{v.to_s.parameterize}"
    end

    "#{CHANNEL_PREFIX}:#{params_part.join(":")}"
  end
end

#docY::Doc

Creates the document once.

This method can be overriden in case the document should be initialized with any state other than an empty one. In conjunction with load, this allows to provide a document to clients that is restored from a persistent store like Redis or also an ActiveRecord model.

Examples:

Initialize a Doc from state stored in Redis

def doc
  @doc ||= load { |id| load_doc(id) }
end

def load_doc(id)
  data = REDIS.get(id)
  data = data.unpack("C*") unless data.nil?
  data
end

Returns:

  • (Y::Doc)

    The initialized document



260
261
262
# File 'lib/y/actioncable/sync.rb', line 260

def doc
  @doc ||= Y::Doc.new
end

#initiateObject

Initiate synchronization. Encodes the current state_vector and transmits to the connecting client.



42
43
44
45
46
47
48
49
50
51
# File 'lib/y/actioncable/sync.rb', line 42

def initiate
  encoder = Y::Lib0::Encoding.create_encoder
  Y::Lib0::Encoding.write_var_uint(encoder, MESSAGE_SYNC)
  Y::Sync.write_sync_step1(encoder, doc)
  update = Y::Lib0::Encoding.to_uint8_array(encoder)
  update = Y::Lib0::Encoding.encode_uint8_array_to_base64(update)

  transmit({ update: update })
  # TODO: implement awareness https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L278-L284
end

#integrate(message, field: FIELD_UPDATE) ⇒ Object

This methods should be passed as a block to stream subscription, and not be put into a generic #receive method.

Parameters:

  • message (Hash)

    The encoded message must include a field named exactly like the field argument. The field value must be a Base64 binary.

  • field (String) (defaults to: FIELD_UPDATE)

    The field that the encoded update should be extracted from.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/y/actioncable/sync.rb', line 61

def integrate(message, field: FIELD_UPDATE) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
  origin = message[FIELD_ORIGIN]
  update = Y::Lib0::Decoding.decode_base64_to_uint8_array(message[field])

  encoder = Y::Lib0::Encoding.create_encoder
  decoder = Y::Lib0::Decoding.create_decoder(update)
  message_type = Y::Lib0::Decoding.read_var_uint(decoder)
  case message_type
  when MESSAGE_SYNC
    Y::Lib0::Encoding.write_var_uint(encoder, MESSAGE_SYNC)
    Y::Sync.read_sync_message(decoder, encoder, doc, nil)

    # If the `encoder` only contains the type of reply message and no
    # message, there is no need to send the message. When `encoder` only
    # contains the type of reply, its length is 1.
    if Y::Lib0::Encoding.length(encoder) > 1
      update = Y::Lib0::Encoding.to_uint8_array(encoder)
      update = Y::Lib0::Encoding.encode_uint8_array_to_base64(update)

      transmit({ update: update })
    end
  when MESSAGE_AWARENESS
    # TODO: implement awareness https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L179-L181
  else
    raise "unexpected message_type=`#{message_type}`"
  end

  # do not transmit message back to current connection if the connection
  # is the origin of the message
  connection_identifier = connection.connection_identifier
  return if connection_identifier.present? && origin == connection_identifier

  transmit(message)
end

#load {|id| ... } ⇒ Y::Doc

Load the current state of a document from an external source and returns a reference to the document.

for block { |id| … }

Yields:

  • (id)

    Read document from e.g. an external store

Yield Parameters:

  • id (String)

    The document ID

Yield Returns:

  • (Array<Integer>)

    The binary encoded state of the document

Returns:

  • (Y::Doc)

    A reference to the loaded document



220
221
222
223
224
225
226
# File 'lib/y/actioncable/sync.rb', line 220

def load(&block)
  full_diff = nil
  full_diff = yield(canonical_channel_key) if block
  @doc ||= Y::Doc.new
  @doc.sync(full_diff) unless full_diff.nil?
  @doc
end

#persist {|id, update| ... } ⇒ Object

Persist the current document state to an external store.

for block { |id, update| … }

Yields:

  • (id, update)

    Store document state to e.g. an external store

Yield Parameters:

  • id (String)

    The document ID

  • update (Array<Integer>)

    The full document state as binary encoded state



236
237
238
# File 'lib/y/actioncable/sync.rb', line 236

def persist(&block)
  yield(canonical_channel_key, doc.diff) if block
end

#sync(broadcasting, message, field: FIELD_UPDATE) ⇒ Object

Synchronize update with all other connected clients (and server processes).

Parameters:

  • broadcasting (String)
  • message (Hash)
  • field (optional, String) (defaults to: FIELD_UPDATE)


150
151
152
153
154
155
156
157
158
159
160
# File 'lib/y/actioncable/sync.rb', line 150

def sync(broadcasting, message, field: FIELD_UPDATE)
  update = message[field]

  # we broadcast to all connected clients, but provide the
  # connection_identifier as origin so that the [#integrate] method is
  # able to filter sending back the update to its origin.
  ActionCable.server.broadcast(
    broadcasting,
    { update: update, origin: connection.connection_identifier }
  )
end

#sync_for(model) {|id, update| ... } ⇒ Object

Sync for given model. This is a utility method that simplifies the setup of a sync channel.

for block { |id, update| … }

Parameters:

  • model (Object)

Yields:

  • (id, update)

    Optional block that allows to persist the document

Yield Parameters:

  • id (String)

    The document ID

  • update (Array<Integer>)

    The full document state as binary encoded update



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/y/actioncable/sync.rb', line 107

def sync_for(model, &block)
  stream_for(model, coder: ActiveSupport::JSON) do |message|
    # integrate updates in the y-rb document
    integrate(message)

    # persist document
    persist(&block) if block
  end

  # negotiate initial state with client
  initiate
end

#sync_from(broadcasting) {|id, update| ... } ⇒ Object

Sync for given stream. This is a utility method that simplifies the setup of a sync channel.

for block { |id, update| … }

Parameters:

  • broadcasting (String)

Yields:

  • (id, update)

    Optional block that allows to persist the document

Yield Parameters:

  • id (String)

    The document ID

  • update (Array<Integer>)

    The full document state as binary encoded update



131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/y/actioncable/sync.rb', line 131

def sync_from(broadcasting, &block)
  stream_from(broadcasting, coder: ActiveSupport::JSON) do |message|
    # integrate updates in the y-rb document
    integrate(message)

    # persist document
    persist(&block) if block
  end

  # negotiate initial state with client
  initiate
end

#sync_to(to, message, field: FIELD_UPDATE) ⇒ Object

Synchronize update with all other connected clients (and server processes).

Parameters:

  • to (Object)
  • message (Hash)
  • field (optional, String) (defaults to: FIELD_UPDATE)


168
169
170
171
172
173
174
175
176
177
178
# File 'lib/y/actioncable/sync.rb', line 168

def sync_to(to, message, field: FIELD_UPDATE)
  update = message[field]

  # we broadcast to all connected clients, but provide the
  # connection_identifier as origin so that the [#integrate] method is
  # able to filter sending back the update to its origin.
  self.class.broadcast_to(
    to,
    { update: update, origin: connection.connection_identifier }
  )
end