request

Request can represent a request or notification from jsorpc2.0.

1
2
3
4
5
6
7
8
type Request struct {
  Method string           `json:"method"`
  Params *json.RawMessage `json:"params,omitempty"`
  ID     ID               `json:"id"`
  Notif  bool             `json:"-"`

  Meta *json.RawMessage `json:"meta,omitempty"`
}

The Meta field is not specified in the jsonrpc spec, but is an aid to tracking context. The Notif field indicates whether it is a notification or not. The Params/Meta fields are all json serialized strings.

This Request has several methods:

  • MarshalJSON serialized to json string
  • UnmarshalJSON deserializes the json string into a Request object.
  • SetParams/SetMeta to set the Params/Meta fields respectively.

The ID is also specified in the spec, and can be a string/value/null. The current implementation of this package does not support IDs that are

1
2
3
4
5
6
type ID struct {
  Num uint64
  Str string

  IsString bool
}

In most cases, there is always a non-zero value for Num and Str, and when both are zero, IsString is used to distinguish which field is used as the id:

  • String supports printing
  • MarshalJSON serialization
  • UnmarshalJSON deserialization

Response

Response is a response only and does not contain a notification.

1
2
3
4
5
6
7
type Response struct {
  ID     ID               `json:"id"`
  Result *json.RawMessage `json:"result,omitempty"`
  Error  *Error           `json:"error,omitempty"`

  Meta *json.RawMessage `json:"meta,omitempty"`
}

Result and Error are only one kind of return. In case of an error, there is a special kind of error that is simply ignored for simplicity: the request ID error.

Response also contains several methods:

  • MarshalJSON serialized to a json string
  • UnmarshalJSON deserialize to Response object
  • SetResult SetResult field

As you can see from the source code here, Request and Response use two different ways of writing when serializing and deserializing, which is a strange trick the author is showing.

Response.Error in the spec is clearly defined:

1
2
3
4
5
type Error struct {
  Code    int64            `json:"code"`
  Message string           `json:"message"`
  Data    *json.RawMessage `json:"data"`
}

Method of Error:

  • SetError sets the Data in Error
  • Error implements the error interface.

The following are the errors specified in the spec

1
2
3
4
5
6
7
const (
  CodeParseError     = -32700
  CodeInvalidRequest = -32600
  CodeMethodNotFound = -32601
  CodeInvalidParams  = -32602
  CodeInternalError  = -32603
)

Conn

Conn is the connection between client and server of json-rpc, client and server are symmetric, so both client and server will use Conn object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Conn struct {
  stream ObjectStream

  h Handler

  mu       sync.Mutex
  shutdown bool
  closing  bool
  seq      uint64
  pending  map[ID]*call

  sending sync.Mutex

  disconnect chan struct{}

  logger Logger

  onRecv []func(*Request, *Response)
  onSend []func(*Request, *Response)
}

Conn involves a lot of other objects, let’s analyze them one by one, and finally look at the functions Conn provides.

ObjectStream

ObjectStreram, a bidirectional stream of jsonrpc2.9 objects, through which jsonrpc2.0 objects can be written to the stream, and objects can be retrieved from the stream.

1
2
3
4
5
6
type ObjectStream interface {
  WriteObject(obj interface{}) error
  ReadObject(v interface{}) error

  io.Closer
}

Looking at the implementation types through GoImplements, we see that bufferedObjectStream and one of the sub-package types are also implemented. Let’s look at bufferedObjectStream first:

1
2
3
4
5
6
7
8
9
type bufferedObjectStream struct {
  conn io.Closer // all writes should go through w, all reads through r
  w    *bufio.Writer
  r    *bufio.Reader

  codec ObjectCodec

  mu sync.Mutex
}

ReadWriteCloser to send and receive objects (jsonrpc2.0 objects). Before analyzing the bufferedObjectStream, let’s take a look at the ObjectCodec:

ObjectCodec specifies how to encode and decode jsonrpc2.0 objects in the stream.

1
2
3
4
type ObjectCodec interface {
  WriteObject(stream io.Writer, obj interface{}) error
  ReadObject(stream *bufio.Reader, v interface{}) error
}

There are two types that implement ObjectCodec, indicating the existence of two types of encoding: VarintObjectCodec and VSCodeObjectCodec.

VarintObjectCodec is called header variable length, the header is the length, first look at the write object:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    type VarintObjectCodec struct{}
    func (VarintObjectCodec) WriteObject(
      stream io.Writer, obj interface{}) error {
      data, err := json.Marshal(obj)
      if err != nil {
        return err
      }
      var buf [binary.MaxVarintLen64]byte
      b := binary.PutUvarint(buf[:], uint64(len(data)))
      if _, err := stream.Write(buf[:b]); err != nil {
        return err
      }
      if _, err := stream.Write(data); err != nil {
        return err
      }
      return nil
    }

MaxVarintLen64 is 10, which means the maximum number of bits of int64 is 10. binary. So the flow of VarintObjectCodec.WriteObject is as follows:

  • Serialize the data to json
  • Write the binary encoded length to stream
  • Write the json serialized data to stream

Because the length of json serialization is variable, this encoding method is also called variable length encoding.

1
2
3
4
5
6
7
8
func (VarintObjectCodec) ReadObject(
  stream *bufio.Reader, v interface{}) error {
  b, err := binary.ReadUvarint(stream)
  if err != nil {
    return err
  }
  return json.NewDecoder(io.LimitReader(stream, int64(b))).Decode(v)
}

First read a binary encoded uint64 length from the stream, and then read a fixed length data from the stream, this data is the json serialized data, deserialization can be.

The remaining encoding format is VSCodeObjectCodec, which contains the length and type in the header, which is an encoding format defined by Microsoft in lsp.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type VSCodeObjectCodec struct{}
func (VSCodeObjectCodec) WriteObject(
  stream io.Writer, obj interface{}) error {
  data, err := json.Marshal(obj)
  if err != nil {
    return err
  }
  if _, err := fmt.Fprintf(
      stream, "Content-Length: %d\r\n\r\n", len(data)); err != nil {
    return err
  }
  if _, err := stream.Write(data); err != nil {
    return err
  }
  return nil
}

As you can see, the Content-Type is fixed (json serialized data), so in the implementation, only the length is specified, the length and content before a blank line, and the newline is ‘\r\n’.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    func (VSCodeObjectCodec) ReadObject(
      stream *bufio.Reader, v interface{}) error {
      var contentLength uint64
      for {
        line, err := stream.ReadString('\r')
        if err != nil {
          return err
        }
        b, err := stream.ReadByte()
        if err != nil {
          return err
        }
        if b != '\n' {
          return fmt.Errorf(`jsonrpc2: line endings must be \r\n`)
        }
        if line == "\r" {
          break
        }
        if strings.HasPrefix(line, "Content-Length: ") {
          line = strings.TrimPrefix(line, "Content-Length: ")
          line = strings.TrimSpace(line)
          var err error
          contentLength, err = strconv.ParseUint(line, 10, 32)
          if err != nil {
            return err
          }
        }
      }
      if contentLength == 0 {
        return fmt.Errorf("jsonrpc2: no Content-Length header found")
      }
      return json.NewDecoder(io.LimitReader(stream, int64(contentLength))).Decode(v)
    }

ReadString(), read directly to the specified character, with the specified character. HasPrefix(), determine if the prefix of a string is xxx. strings. TrimSpace(), remove the first and last spaces.

Use a for loop, very cleverly the two ‘\r\n’ are handled, for loop exit, just the length of the content is also calculated. Finally, call the json library to do the deserialization.

Here after the two json encoding methods, let’s come back to bufferedObjectStream.

First look at the bufferedObjectStream implementation of ObjectStream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (t *bufferedObjectStream) WriteObject(obj interface{}) error {
  t.mu.Lock()
  defer t.mu.Unlock()
  if err := t.codec.WriteObject(t.w, obj); err != nil {
    return err
  }
  return t.w.Flush()
}
// 最后还是通过具体的编码方式对象去实现写对象

func (t *bufferedObjectStream) ReadObject(v interface{}) error {
  return t.codec.ReadObject(t.r, v)
}

func (t *bufferedObjectStream) Close() error {
  return t.conn.Close()
}

More specific analysis: bufferedObjectStream is not exported, through the constructor NewBufferedStream to construct. Furthermore, reading and writing objects to the stream is done through specific encoding objects, Therefore, bufferedObjectStream actually provides a representation of the stream, which is specified by NewBufferedStream.

1
2
3
4
5
6
7
8
9
func NewBufferedStream(
  conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
  return &bufferedObjectStream{
    conn:  conn,
    w:     bufio.NewWriter(conn),
    r:     bufio.NewReader(conn),
    codec: codec,
  }
}

ReadWriteCloser. From this constructor, we can see that the stream object for reading and writing is io.

At this point, we are done analyzing ObjectStream-related stuff (except for the sub-packages). Let’s go back to the Conn object.

Handler

The second field in the Conn structure.

1
2
3
type Handler interface {
  Handle(context.Context, *Conn, *Request)
}

Handler is an interface for handling jsonrpc requests and notifications, Handle() is handled one request at a time, if there is no requirement for the order, you can use the asynchronous version of AsyncHandler. We first look at the synchronous version of HandlerWithErrorConfigurer, and then look at the asynchronous version of AsyncHandler.

1
2
3
4
5
type HandlerWithErrorConfigurer struct {
  handleFunc func(
    context.Context, *Conn, *Request) (result interface{}, err error)
  suppressErrClosed bool
}

This structure is relatively simple and contains a function and a flag. The constructor is HandlerWithError:

1
2
3
4
5
func HandlerWithError(handleFunc
  func(context.Context, *Conn, *Request) (result interface{}, err error
    )) *HandlerWithErrorConfigurer {
  return &HandlerWithErrorConfigurer{handleFunc: handleFunc}
}

The implementation of the Handler is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    func (h *HandlerWithErrorConfigurer) Handle(
      ctx context.Context, conn *Conn, req *Request) {
      result, err := h.handleFunc(ctx, conn, req)
      if req.Notif {
        if err != nil {
          conn.logger.Printf("jsonrpc2 handler: notification %q handling error: %v\n",
            req.Method, err)
        }
        return
      }

      resp := &Response{ID: req.ID}
      if err == nil {
        err = resp.SetResult(result)
      }
      if err != nil {
        if e, ok := err.(*Error); ok {
          resp.Error = e
        } else {
          resp.Error = &Error{Message: err.Error()}
        }
      }

      if !req.Notif {
        if err := conn.SendResponse(ctx, resp); err != nil {
          if err != ErrClosed || !h.suppressErrClosed {
            conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n",
              resp.ID, err)
          }
        }
      }
    }

The processing flow is as follows:

  • The handler is specified at the time of construction, and the handler is called at this time.
    • If Request is a notification, if there is an error, it will be recorded; if not, it will be finished.
  • If Request is a request, then the processing function returns the result
    • Construct a Response
    • SendResponse to send a response

The HandlerWithErrorConfigurer also supports ignoring connection closure errors. Next look at the asynchronous Handler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func AsyncHandler(h Handler) Handler {
  return asyncHandler{h}
}

type asyncHandler struct {
  Handler
}

func (h asyncHandler) Handle(ctx context.Context, conn *Conn, req *Request) {
  go h.Handler.Handle(ctx, conn, req)
}

Asynchronous is relatively simple, directly embedded in a Handler interface object, using go to do asynchronous processing. Note that the constructor AsyncHandler is exposed, the implementation object asyncHandler structure is not exposed. Because the asyncHandler has a Hanlder embedded in it, the HanlderWithErrorConfigurer will eventually be used.

call

call represents the entire life cycle of a jsonrpc call.

1
2
3
4
5
6
type call struct {
  request  *Request
  response *Response
  seq      uint64 // the seq of the request
  done     chan error
}

map[ID]call is used to log all calls.

Logger

Logging, the standard library log.

1
2
3
type Logger interface {
  Printf(format string, v ...interface{})
}

Conn analysis

Ensure that Conn implements the JSONRPC2 interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
var _ JSONRPC2 = (*Conn)(nil)
func NewConn(ctx context.Context, stream ObjectStream,
  h Handler, opts ...ConnOpt) *Conn {
  c := &Conn{
    stream:     stream,
    h:          h,
    pending:    map[ID]*call{},
    disconnect: make(chan struct{}),
    logger:     log.New(os.Stderr, "", log.LstdFlags),
  }
  for _, opt := range opts {
    if opt == nil {
      continue
    }
    opt(c)
  }
  go c.readMessages(ctx)
  return c
}

The whole construction is quite interesting, ObjectStream and Handler are both interfaces that need to be constructed before they can be passed in. The logs are finally thrown into os.Stderr. The construction also does two things:

  • Iterate over the opt, and process the constructed Conn.
  • Open the read (concurrently)

ConnOpt provides custom handling of Conn. readMessages() is the read concurrent, before we dive in, let’s analyze the anyMessage type:

anyMessage

anyMessage represents a Request or a Response:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type anyMessage struct {
  request  *Request
  response *Response
}

func (m anyMessage) MarshalJSON() ([]byte, error) {
  var v interface{}
  switch {
  case m.request != nil && m.response == nil:
    v = m.request
  case m.request == nil && m.response != nil:
    v = m.response
  }
  if v != nil {
    return json.Marshal(v)
  }
  return nil, errors.New(
    "jsonrpc2: message must have exactly \
    one of the request or response fields set")
}

json serialization is the call to serialize Request or Response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    func (m *anyMessage) UnmarshalJSON(data []byte) error {
      type msg struct {
        ID     interface{}              `json:"id"`
        Method *string                  `json:"method"`
        Result anyValueWithExplicitNull `json:"result"`
        Error  interface{}              `json:"error"`
      }

      var isRequest, isResponse bool
      checkType := func(m *msg) error {
        mIsRequest := m.Method != nil
        mIsResponse := m.Result.null || m.Result.value != nil || m.Error != nil
        if (!mIsRequest && !mIsResponse) || (mIsRequest && mIsResponse) {
          return errors.New(
            "jsonrpc2: unable to determine message type (request or response)")
        }
        if (mIsRequest && isResponse) || (mIsResponse && isRequest) {
          return errors.New(
            "jsonrpc2: batch message type mismatch (must be all requests or all responses)")
        }
        isRequest = mIsRequest
        isResponse = mIsResponse
        return nil
      }

      if isArray := len(data) > 0 && data[0] == '['; isArray {
        var msgs []msg
        if err := json.Unmarshal(data, &msgs); err != nil {
          return err
        }
        if len(msgs) == 0 {
          return errors.New("jsonrpc2: invalid empty batch")
        }
        for i := range msgs {
          if err := checkType(&msg{
            ID:     msgs[i].ID,
            Method: msgs[i].Method,
            Result: msgs[i].Result,
            Error:  msgs[i].Error,
          }); err != nil {
            return err
          }
        }
      } else {
        var m msg
        if err := json.Unmarshal(data, &m); err != nil {
          return err
        }
        if err := checkType(&m); err != nil {
          return err
        }
      }

      var v interface{}
      switch {
      case isRequest && !isResponse:
        v = &m.request
      case !isRequest && isResponse:
        v = &m.response
      }
      if err := json.Unmarshal(data, v); err != nil {
        return err
      }
      if !isRequest && isResponse && m.response.Error == nil
          && m.response.Result == nil {
        m.response.Result = &jsonNull
      }
      return nil
    }

This piece of code can be divided into three pieces:

  • Define a detection function, this function can detect after deserialization, is Request or Response
  • json deserialization, and then use the detection function to determine, support Request array or Response array
  • json deserialization to the specified object

Conn’s read concatenation

Read concurrently is started at the time of Conn construction

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// 这个读协程分了两大步:
// 循环读;退出循环之后的资源释放
func (c *Conn) readMessages(ctx context.Context) {
  var err error
  for err == nil {

    // 从stream中读数据
    // 当读失败时,退出for循环,这也是唯一退出循环的方式
    var m anyMessage
    err = c.stream.ReadObject(&m)
    if err != nil {
      break
    }

    switch {

    // 如果是Request,先调用onRecv做前处理,在调用Handler来处理请求
    case m.request != nil:
      for _, onRecv := range c.onRecv {
        onRecv(m.request, nil)
      }
      c.h.Handle(ctx, c, m.request)

    case m.response != nil:
      resp := m.response
      if resp != nil {
        id := resp.ID

        // 如果是Response,先从待处理列表pending中删除
        // 因为收到了响应,意味着一次请求已经闭环
        c.mu.Lock()
        call := c.pending[id]
        delete(c.pending, id)
        c.mu.Unlock()

        if call != nil {
          call.response = resp
        }

        // 对Response的一些后处理
        if len(c.onRecv) > 0 {
          var req *Request
          if call != nil {
            req = call.request
          }
          for _, onRecv := range c.onRecv {
            onRecv(req, resp)
          }
        }

        // 最后的错误处理分3类
        // 错误处理之后都会将此次请求-响应的生命周期标记为完结
        switch {
        case call == nil:
          c.logger.Printf(
            "jsonrpc2: ignoring response #%s with no corresponding request\n",
            id)

        case resp.Error != nil:
          call.done <- resp.Error
          close(call.done)

        default:
          call.done <- nil
          close(call.done)
        }
      }
    }
  }

  // stream读失败后,退出循环,释放资源
  c.sending.Lock()
  c.mu.Lock()
  c.shutdown = true
  closing := c.closing
  if err == io.EOF {
    if closing {
      err = ErrClosed
    } else {
      err = io.ErrUnexpectedEOF
    }
  }
  for _, call := range c.pending {
    call.done <- err
    close(call.done)
  }
  c.mu.Unlock()
  c.sending.Unlock()
  if err != io.ErrUnexpectedEOF && !closing {
    c.logger.Printf("jsonrpc2: protocol error: %v\n", err)
  }
  close(c.disconnect)
}

Conn’s implementation of the JSONRPC2 interface

JSONRPC2 interface, Call method is a standard request, Notify is a notification request, Close is to close the relevant connection.

1
2
3
4
5
6
7
type JSONRPC2 interface {
  Call(ctx context.Context, method string, params,
    result interface{}, opt ...CallOption) error
  Notify(ctx context.Context, method string,
    params interface{}, opt ...CallOption) error
  Close() error
}

Implementation of Call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (c *Conn) Call(ctx context.Context, method string, params,
  result interface{}, opts ...CallOption) error {

  //  封装一个请求
  req := &Request{Method: method}
  if err := req.SetParams(params); err != nil {
    return err
  }

  // 对请求的前处理
  for _, opt := range opts {
    if opt == nil {
      continue
    }
    if err := opt.apply(req); err != nil {
      return err
    }
  }

  // 发送请求
  call, err := c.send(ctx, &anyMessage{request: req}, true)
  if err != nil {
    return err
  }

  // 等待返回
  select {

  // 成功返回(从stream上读取到响应)
  case err, ok := <-call.done:
    if !ok {
      err = ErrClosed
    }
    if err != nil {
      return err
    }
    if result != nil {
      if call.response.Result == nil {
        call.response.Result = &jsonNull
      }

      // 最后将响应中的result返回
      if err := json.Unmarshal(*call.response.Result, result); err != nil {
        return err
      }
    }
    return nil

  // 超时或被取消
  case <-ctx.Done():
    return ctx.Err()
  }
}

The implementation of Notify is a little different from the standard request in that there is no response to the Notify request, and it returns when it is sent:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (c *Conn) Notify(ctx context.Context,
  method string, params interface{}, opts ...CallOption) error {
  req := &Request{Method: method, Notif: true}
  if err := req.SetParams(params); err != nil {
    return err
  }
  for _, opt := range opts {
    if opt == nil {
      continue
    }
    if err := opt.apply(req); err != nil {
      return err
    }
  }
  _, err := c.send(ctx, &anyMessage{request: req}, false)
  return err
}

Notify is similar to Call in implementation, except that it does not have to wait for a response. In addition, the Notify request does not have an id, so the Call preprocessing must add an ID to the Request.

The implementation of Close:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (c *Conn) Close() error {
  c.mu.Lock()
  if c.shutdown || c.closing {
    c.mu.Unlock()
    return ErrClosed
  }
  c.closing = true
  c.mu.Unlock()
  return c.stream.Close()
}

Here are the other methods of Conn:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// send, 请求和响应都是通过此函数去发送的
func (c *Conn) send(_ context.Context, m *anyMessage,
  wait bool) (cc *call, err error) {
  c.sending.Lock()
  defer c.sending.Unlock()

  var id ID

  c.mu.Lock()
  if c.shutdown || c.closing {
    c.mu.Unlock()
    return nil, ErrClosed
  }

  // 如果是标准请求,需要维护一个pending列表
  // 里面每一个元素,表示一次请求的完整生命周期
  if m.request != nil && wait {
    cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)}
    if !m.request.ID.IsString && m.request.ID.Num == 0 {
      m.request.ID.Num = c.seq
    }
    id = m.request.ID
    c.pending[id] = cc
    c.seq++
  }
  c.mu.Unlock()

  // 发送之前的统一前处理
  if len(c.onSend) > 0 {
    var (
      req  *Request
      resp *Response
    )
    switch {
    case m.request != nil:
      req = m.request
    case m.response != nil:
      resp = m.response
    }
    for _, onSend := range c.onSend {
      onSend(req, resp)
    }
  }

  // 一旦出错,不用等待,直接从pending列表中删除
  defer func() {
    if err != nil {
      if cc != nil {
        c.mu.Lock()
        delete(c.pending, id)
        c.mu.Unlock()
      }
    }
  }()

  // 调用ObjectStream将jsonrpc对象写入到流中
  if err := c.stream.WriteObject(m); err != nil {
    return nil, err
  }
  return cc, nil
}

Conn.Reply returns a response (success); Conn.ReplyWithError returns a failed response The response will have an id corresponding to the id of the request.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (c *Conn) Reply(ctx context.Context, id ID, result interface{}) error {
  resp := &Response{ID: id}
  if err := resp.SetResult(result); err != nil {
    return err
  }
  _, err := c.send(ctx, &anyMessage{response: resp}, false)
  return err
}

func (c *Conn) ReplyWithError(
  ctx context.Context, id ID, respErr *Error) error {
  _, err := c.send(ctx, &anyMessage{
      response: &Response{ID: id, Error: respErr}}, false)
  return err
}

func (c *Conn) SendResponse(ctx context.Context, resp *Response) error {
  _, err := c.send(ctx, &anyMessage{response: resp}, false)
  return err
}

At the end there is also a way to get the “disconnected channel”:

1
2
3
func (c *Conn) DisconnectNotify() <-chan struct{} {
  return c.disconnect
}

Reanalysis

By now the source code of jsonrpc2 package has been finished, but it is all silos, no overall understanding. Please jump to subpackage websocket

So far, the following extensions are provided:

  • OnSend/OnRecv, the pre-processing for receiving and sending
  • Handler, how to deal with the request, this piece of business logic
  • ObjectStream, this through the sub-package, to determine the use of websocket

When we analyzed this, we realized that a lot of the structure is for testing purposes, but in fact the whole jsonrpc2 provides a relatively simple functionality. This package, except for the testing part, is very simple.

The last piece

bufferedObjectStream provides an implementation of ObjectStream, but based on bufio, the subpackage sourcegraph/jsonrpc2/websocket is the websocket-based jsonrpc2 stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type ObjectStream struct {
  conn *ws.Conn
}

func NewObjectStream(conn *ws.Conn) ObjectStream {
  return ObjectStream{conn: conn}
}

func (t ObjectStream) WriteObject(obj interface{}) error {
  return t.conn.WriteJSON(obj)
}

func (t ObjectStream) ReadObject(v interface{}) error {
  err := t.conn.ReadJSON(v)
  if e, ok := err.(*ws.CloseError); ok {
    if e.Code == ws.CloseAbnormalClosure &&
        e.Text == io.ErrUnexpectedEOF.Error() {
      err = io.ErrUnexpectedEOF
    }
  }
  return err
}

func (t ObjectStream) Close() error {
  return t.conn.Close()
}

The ws here refers to gorilla/websocket.

With this subpackage, all sending and receiving is websocket based.

Use in ion-sfu

Only the Conn object is used, and the disconnected channel is listened to.

1
2
jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p)
<-jc.DisconnectNotify()

So sourcegraph/jsonrpc2 has a lot of extensions, but the ion project only uses ObjectStream and Handler, and ObjectStream is extended to websocket with a sub-package, and Handler is not clear:

The third parameter in jsonrpc2.NewConn is the Handler, we first analyze when it will be called, and finally analyze what is inside the Handler.

The above analysis of Conn, said: in the construction of Conn, will create a read concurrent, in this read concurrent, there is a for loop has been reading from the stream (read from the websocket), which read the Request, will call Handler to handle:

1
c.h.Handle(ctx, c, m.request)

At this point, only then will the Handle() of the Handler interface be called. Handler call time is found, to see the specific processing logic:

1
2
s := sfu.NewSFU(conf)
p := server.NewJSONSignal(sfu.NewPeer(s))