Module: MatrixSdk::Protocols::MSC

Defined in:
lib/matrix_sdk/protocols/msc.rb

Overview

Preliminary support for unmerged MSCs (Matrix Spec Changes)

Instance Method Summary collapse

Instance Method Details

#msc2108?Boolean

Check if there’s support for MSC2108 - Sync over Server Sent Events

Returns:

  • (Boolean)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/matrix_sdk/protocols/msc.rb', line 10

def msc2108?
  @msc ||= {}
  @msc[2108] ||= \
    begin
      request(:get, :client_r0, '/sync/sse', skip_auth: true, headers: { accept: 'text/event-stream' })
    rescue MatrixSdk::MatrixNotAuthorizedError # Returns 401 if implemented
      true
    rescue MatrixSdk::MatrixRequestError
      false
    end
rescue StandardError => e
  logger.debug "Failed to check MSC2108 status;\n#{e.inspect}"
  false
end

#msc2108_sync_sse(since: nil, **params, &on_data) ⇒ Object

Sync over Server Sent Events - MSC2108

rubocop:disable Metrics/MethodLength

Examples:

Syncing over SSE

@since = 'some token'
api.msc2108_sync_sse(since: @since) do |data, event:, id:|
  if event == 'sync'
    handle(data) # data is the same as a normal sync response
    @since = id
  end
end

Raises:

  • (ArgumentError)

See Also:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/matrix_sdk/protocols/msc.rb', line 39

def msc2108_sync_sse(since: nil, **params, &on_data)
  raise ArgumentError, 'Must be given a block accepting two args - data and { event:, id: }' \
    unless on_data.is_a?(Proc) && on_data.arity == 2
  raise 'Needs to be logged in' unless access_token # TODO: Better error

  query = params.select do |k, _v|
    %i[filter full_state set_presence].include? k
  end
  query[:user_id] = params.delete(:user_id) if protocol?(:AS) && params.key?(:user_id)

  req = Net::HTTP::Get.new(homeserver.dup.tap do |u|
    u.path = "#{api_to_path :client_r0}/sync/sse"
    u.query = URI.encode_www_form(query)
  end)
  req['accept'] = 'text/event-stream'
  req['accept-encoding'] = 'identity' # Disable compression on the SSE stream
  req['authorization'] = "Bearer #{access_token}"
  req['last-event-id'] = since if since

  cancellation_token = { run: true }

  # rubocop:disable Metrics/BlockLength
  thread = Thread.new(cancellation_token) do |ctx|
    print_http(req)
    @http_lock&.lock
    http.request req do |response|
      @http_lock&.unlock
      break unless ctx[:run]

      print_http(response, body: false)
      raise MatrixRequestError.new_by_code(JSON.parse(response.body, symbolize_names: true), response.code) unless response.is_a? Net::HTTPSuccess

      # Override buffer size for BufferedIO
      socket = response.instance_variable_get :@socket
      if socket.is_a? Net::BufferedIO
        socket.instance_eval do
          def rbuf_fill
            bufsize_override = 1024
            loop do
              case rv = @io.read_nonblock(bufsize_override, exception: false)
              when String
                @rbuf << rv
                rv.clear
                return
              when :wait_readable
                @io.to_io.wait_readable(@read_timeout) || raise(Net::ReadTimeout)
              when :wait_writable
                @io.to_io.wait_writable(@read_timeout) || raise(Net::ReadTimeout)
              when nil
                raise EOFError, 'end of file reached'
              end
            end
          end
        end
      end

      stream_id = ('A'..'Z').to_a.sample(4).join

      logger.debug "MSC2108 : #{stream_id} : Starting SSE stream."

      buffer = ''
      response.read_body do |chunk|
        buffer += chunk

        while (index = buffer.index(/\r\n\r\n|\n\n/))
          stream = buffer.slice!(0..index)

          data = ''
          event = nil
          id = nil

          stream.split(/\r?\n/).each do |part|
            /^data:(.+)$/.match(part) do |m_data|
              data += "\n" unless data.empty?
              data += m_data[1].strip
            end
            /^event:(.+)$/.match(part) do |m_event|
              event = m_event[1].strip
            end
            /^id:(.+)$/.match(part) do |m_id|
              id = m_id[1].strip
            end
            /^:(.+)$/.match(part) do |m_comment|
              logger.debug "MSC2108 : #{stream_id} : Received comment '#{m_comment[1].strip}'"
            end
          end

          if %w[sync sync_error].include? event
            data = JSON.parse(data, symbolize_names: true)
            yield((MatrixSdk::Response.new self, data), event: event, id: id)
          elsif event
            logger.info "MSC2108 : #{stream_id} : Received unknown event '#{event}'; #{data}"
          end
        end

        unless ctx[:run]
          socket.close
          break
        end
      end

      break unless ctx[:run]
    end
  ensure
    @http_lock.unlock if @http_lock&.owned?
  end
  # rubocop:enable Metrics/BlockLength

  thread.run

  [thread, cancellation_token]
end

#refresh_mscsObject



5
6
7
# File 'lib/matrix_sdk/protocols/msc.rb', line 5

def refresh_mscs
  @msc = {}
end