go pub/sub

I believe you all know Publish / Subscribe mode, developers can use third-party open source tools like Redis, NSQ or Nats to implement the subscription mechanism, this article will teach you how to use Go Language to write a stand-alone version of Pub/Sub mode, which is very lightweight in a single system and does not need to rely on third-party services It is easy to implement. The following will directly use a single subscription Topic mechanism to write Publisher and Subscriber.

Subscriber

Subscriber

The first step is to create a Hub to accept multiple Subscribers, and the structure of this Hub structure is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type hub struct {
  sync.Mutex
  subs map[*subscriber]struct{}
}

func newHub() *hub {
  return &hub{
    subs: map[*subscriber]struct{}{},
  }
}

Initialize subcribers with map via newHub, the reason for using map is that it is more convenient to implement unsubscribe later. Next, create the subscriber structure.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type message struct {
  data []byte
}

type subscriber struct {
  sync.Mutex

  name    string
  handler chan *message
  quit    chan struct{}
}

The name represents the name of the subscriber, and then the run function is added to receive messages after a successful subscription.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (s *subscriber) run(ctx context.Context) {
  for {
    select {
    case msg := <-s.handler:
      log.Println(s.name, string(msg.data))
    case <-s.quit:
      return
    case <-ctx.Done():
      return
    }
  }
}

Receive channel messages by for and select. At the bottom is the initialization of a single Subscriber.

1
2
3
4
5
6
7
func newSubscriber(name string) *subscriber {
  return &subscriber{
    name:    name,
    handler: make(chan *message, 100),
    quit:    make(chan struct{}),
  }
}

Note that each subscriber receives messages from the Hub through a buffer channel. Please decide whether to adjust the buffer size according to the system context. After initializing the subscriber, you need to drop it into the Hub to subscribe.

1
2
3
4
5
6
7
8
9
func (h *hub) subscribe(ctx context.Context, s *subscriber) error {
  h.Lock()
  h.subs[s] = struct{}{}
  h.Unlock()

  go s.run(ctx)

  return nil
}

Save the subscriber via map and drop it into the background to receive messages via goroutine.

Publisher

Publisher

Next, the message is received by the Publisher and dropped to all Subscribers. In the previous step, we saw that the subscriber implements the run function to accept publisher messages. Let’s see how to implement the publish message mechanism.

1
2
3
4
5
6
7
8
9
func (h *hub) publish(ctx context.Context, msg *message) error {
  h.Lock()
  for s := range h.subs {
    s.publish(ctx, msg)
  }
  h.Unlock()

  return nil
}

The for loop reads out all subscribers and passes in the street message. Next, take a look at how to implement the publish method for a subscriber.

1
2
3
4
5
6
7
8
func (s *subscriber) publish(ctx context.Context, msg *message) {
  select {
  case <-ctx.Done():
    return
  case s.handler <- msg:
  default:
  }
}

Here we use select to make sure the whole main will not be blocked. If the message processing is too slow and we don’t use select + default, then the system will be blocked.

Unsubscribe

Unsubscribe

If you can subscribe, you need to be able to unsubscribe. That is, how to remove the subscriber from the map and implement unsubscribe function in the hub.

1
2
3
4
5
6
7
func (h *hub) unsubscribe(ctx context.Context, s *subscriber) error {
  h.Lock()
  delete(h.subs, s)
  h.Unlock()
  close(s.quit)
  return nil
}

In addition to unsubscribe, you can see that we also support the context method to cancel the subscription, so if the developer executes cancel(), theoretically it is also possible to cancel the subscription, and we can modify the subscribe function here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (h *hub) subscribe(ctx context.Context, s *subscriber) error {
  h.Lock()
  h.subs[s] = struct{}{}
  h.Unlock()

  go func() {
    select {
    case <-s.quit:
    case <-ctx.Done():
      h.Lock()
      delete(h.subs, s)
      h.Unlock()
    }
  }()

  go s.run(ctx)

  return nil
}

Please note that the go func() listens to the ctx.Done(), and the cancel() can be executed anywhere in the program to delete the subscriber, and there is a quit channel in the subscriber structure, which can be used to close the channel after unsubscribe manually, so that the original This allows the original goroutine to end normally and does not cause the system goroutine to keep getting higher.

Practical example

After completing the above steps, open main.go and start writing the main program.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
  "context"
  "time"
)

func main() {
  ctx := context.Background()
  h := newHub()
  sub01 := newSubscriber("sub01")
  sub02 := newSubscriber("sub02")
  sub03 := newSubscriber("sub03")

  h.subscribe(ctx, sub01)
  h.subscribe(ctx, sub02)
  h.subscribe(ctx, sub03)

  _ = h.publish(ctx, &message{data: []byte("test01")})
  _ = h.publish(ctx, &message{data: []byte("test02")})
  _ = h.publish(ctx, &message{data: []byte("test03")})
  time.Sleep(1 * time.Second)

  h.unsubscribe(ctx, sub03)
  _ = h.publish(ctx, &message{data: []byte("test04")})
  _ = h.publish(ctx, &message{data: []byte("test05")})

  time.Sleep(1 * time.Second)
}

Verify that the messages come out according to our model. In addition, to verify that all goroutines can be closed properly, use go.uber.org/goleak to write the test certificate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func TestMain(m *testing.M) {
  goleak.VerifyTestMain(m)
}

func TestSubscriber(t *testing.T) {
  ctx := context.Background()
  h := newHub()
  sub01 := newSubscriber("sub01")
  sub02 := newSubscriber("sub02")
  sub03 := newSubscriber("sub03")

  h.subscribe(ctx, sub01)
  h.subscribe(ctx, sub02)
  h.subscribe(ctx, sub03)

  assert.Equal(t, 3, h.subscribers())

  h.unsubscribe(ctx, sub01)
  h.unsubscribe(ctx, sub02)
  h.unsubscribe(ctx, sub03)

  assert.Equal(t, 0, h.subscribers())
}

Test using context to cancel subscriber.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestCancelSubscriber(t *testing.T) {
  ctx := context.Background()
  h := newHub()
  sub01 := newSubscriber("sub01")
  sub02 := newSubscriber("sub02")
  sub03 := newSubscriber("sub03")

  h.subscribe(ctx, sub01)
  h.subscribe(ctx, sub02)
  ctx03, cancel := context.WithCancel(ctx)
  h.subscribe(ctx03, sub03)

  assert.Equal(t, 3, h.subscribers())

  // cancel subscriber 03
  cancel()
  time.Sleep(100 * time.Millisecond)
  assert.Equal(t, 2, h.subscribers())

  h.unsubscribe(ctx, sub01)
  h.unsubscribe(ctx, sub02)

  assert.Equal(t, 0, h.subscribers())
}

Summary

You can find that in Go language, Pub/Sub mode can be implemented through a simple Buffer Channel, and you can decide whether to import third-party Pub/Sub tools according to the user’s needs. Finally, we attach all code, hope it will be helpful to you.