Recently in learning the code of the open source project Grafana, I found that the author has implemented an event bus mechanism, which is used in a large number of projects, the effect is also very good, the code is also relatively simple.

https://github.com/grafana/grafana/blob/main/pkg/bus/bus.go

1. Registration and invocation

This writeup is seen everywhere inside this project.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func ValidateOrgAlert(c *models.ReqContext) {
    id := c.ParamsInt64(":alertId")
    
    query := models.GetAlertByIdQuery{Id: id}

    if err := bus.Dispatch(&query); err != nil {
        c.JsonApiErr(404, "Alert not found", nil)
        return
    }

    if c.OrgId != query.Result.OrgId {
        c.JsonApiErr(403, "You are not allowed to edit/view alert", nil)
        return
    }
}

The key is the code bus.Dispatch(&query), which takes as an argument a structure GetAlertByIdQuery with the following content.

1
2
3
4
type GetAlertByIdQuery struct {
    Id int64
    Result *Alert
}

Based on the name you can see that this method is to query Alert by Id, where the Alert structure is the result object, which will not be posted here.

By looking at the source code we can learn that behind Dispatch is a call to the method GetAlertById, and then the result is assigned to the Result of the query parameter and returned.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func GetAlertById(query *models.GetAlertByIdQuery) error {
    alert := models.Alert{}
    has, err := x.ID(query.Id).Get(&alert)
    if !has {
        return fmt.Errorf("could not find alert")
    }
    if err != nil {
        return err
    }
    query.Result = &alert
    return nil
}

The question arises, how is this achieved? What exactly does Dispatch do? What are the benefits of doing this?

Let me answer them one by one.

First, before Dispatch, you need to register this method, that is, call AddHandler, you can see a lot of such code inside the init function in this project.

1
2
3
4
5
6
func init() {
    bus.AddHandler("sql", SaveAlerts)
    bus.AddHandler("sql", HandleAlertsQuery)
    bus.AddHandler("sql", GetAlertById)
    ...
}

In fact, the logic of this method is also very simple, the so-called registration is also through a map of the function name and the corresponding function to do a mapping relationship to save, when we Dispatch is actually through the parameter name to find the previously registered function, and then through reflection to call the function.

There are several map members in the Bus structure, and in this project the author has defined three different types of handlers, one is a normal handler, the kind shown earlier, the second is a contextual handler, and the other is a handler used for event subscription, where we register multiple listeners to an event. When the event is triggered, multiple listeners will be called in turn, which is actually an observer pattern.

1
2
3
4
5
6
7
// InProcBus defines the bus structure
type InProcBus struct {
    handlers        map[string]HandlerFunc
    handlersWithCtx map[string]HandlerFunc
    listeners       map[string][]HandlerFunc
    txMng           TransactionManager
}

Here’s a look at the source code, the AddHandler method is as follows.

1
2
3
4
5
func (b *InProcBus) AddHandler(handler HandlerFunc) {
    handlerType := reflect.TypeOf(handler)
    queryTypeName := handlerType.In(0).Elem().Name() // Get the name of the first parameter of the function, which in the above example is GetAlertByIdQuery
    b.handlers[queryTypeName] = handler
}

The source code for the Dispatch method is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (b *InProcBus) Dispatch(msg Msg) error {
    var msgName = reflect.TypeOf(msg).Elem().Name()

    withCtx := true
    handler := b.handlersWithCtx[msgName] // 根据参数名查找注册过的函数,先查找带Ctx的handler
    if handler == nil {
        withCtx = false
        handler = b.handlers[msgName]
        if handler == nil {
            return ErrHandlerNotFound
        }
    }
    var params = []reflect.Value{}
    if withCtx {
    	// 如果查找到的handler是带Ctx的就给个默认的Background的Ctx
        params = append(params, reflect.ValueOf(context.Background()))
    }
    params = append(params, reflect.ValueOf(msg))

    ret := reflect.ValueOf(handler).Call(params) // 通过反射机制调用函数
    err := ret[0].Interface()
    if err == nil {
        return nil
    }
    return err.(error)
}

For AddHandlerCtx and DispatchCtx these two methods are basically the same, but with an additional context parameter that can be used for timeout control or other purposes.

2. Subscribe and Publish

Besides that, there are 2 methods AddEventListener and Publish, which are the subscription and publication of events.

1
2
3
4
5
6
7
8
9
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
    handlerType := reflect.TypeOf(handler)
    eventName := handlerType.In(0).Elem().Name()
    _, exists := b.listeners[eventName]
    if !exists {
        b.listeners[eventName] = make([]HandlerFunc, 0)
    }
    b.listeners[eventName] = append(b.listeners[eventName], handler)
}

Check the source code to know that you can register multiple handler functions to an event, and Publish is to call the registered functions in turn, and the logic is not complicated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (b *InProcBus) Publish(msg Msg) error {
    var msgName = reflect.TypeOf(msg).Elem().Name()
    var listeners = b.listeners[msgName]

    var params = make([]reflect.Value, 1)
    params[0] = reflect.ValueOf(msg)

    for _, listenerHandler := range listeners {
        ret := reflect.ValueOf(listenerHandler).Call(params)
        e := ret[0].Interface()
        if e != nil {
            err, ok := e.(error)
            if ok {
                return err
            }
            return fmt.Errorf("expected listener to return an error, got '%T'", e)
        }
    }
    return nil
}

There is a bad point here, all subscription functions are called sequentially and do not use Goroutine, so this is not very efficient if many functions are registered.

3. Advantages

Some people may wonder why it is so complicated to make a detour when you can just call the function directly.

Moreover, every time you call it, you have to use the reflection mechanism, and the performance is not good.

I think the main points are as follows.

  1. This way to write a clear logic, decoupling
  2. Convenient unit testing
  3. performance is not the biggest consideration, although it is said that reflection will reduce performance