Introduction to Mutex

The Mutex structure contains two fields.

  • Field state: indicates the current state of the mutex lock.
  • Field sema: is a semaphore variable to control blocking hibernation and wake-up of waiting goroutines.
1
2
3
4
type Mutex struct {
    state int32
    sema  uint32
}

In version 1.9 of Go, in order to solve the waiting goroutine may have been unable to obtain the lock, the hunger mode was added to make the lock more fair, and the unfair waiting time is limited to 1 millisecond.

The meaning of the state field is more complex, as shown below, the lowest three indicate mutexLocked, mutexWoken, mutexStarving, state is a total of 32 bits in length, so the remaining position, used to indicate that there can be 1«(32-3) goroutine waiting for the release of the mutual exclusion lock.

sobyte

The code representation is as follows.

1
2
3
4
5
const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
)

locking process

fast path

1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() { 
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    } 
    m.lockSlow()
}

When adding a lock, at first it will see if it can get the lock directly through CAS, and if it can, then it will get the lock directly successfully.

lockSlow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 等待时间
var waitStartTime int64
// 饥饿标记
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
// 当前的锁的状态
old := m.state
for { 
    // 锁是非饥饿状态,锁还没被释放,尝试自旋
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        // 自旋
        runtime_doSpin()
        // 自旋次数加1
        iter++
        // 设置当前锁的状态
        old = m.state
        continue
    }
    ...
}

After entering the lockSlow method, the first step is to determine whether the following can spin up, based on the following calculation.

1
old&(mutexLocked|mutexStarving) == mutexLocked

You can know that the current lock state must be locked and cannot be in a hungry state for this judgment to be true, and then see if iter satisfies the limit on the number of times, and if both are true, then continue down the line.

The inner if contains four judgments.

  • first determines whether awoke is a wake state.
  • old&mutexWoken == 0 is true to indicate that there are no other nodes that are waking up.
  • old>>mutexWaiterShift ! = 0 indicates that there is currently a waiting goroutine.
  • CAS sets the state’s mutexWoken status bit to old|mutexWoken, which is whether 1 is successful.

If both are satisfied, then set the awoke state to true, then add one to the spin count, and reset the state.

Continue to the next page.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
new := old
if old&mutexStarving == 0 {
    // 如果当前不是饥饿模式,那么将mutexLocked状态位设置1,表示加锁
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    // 如果当前被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数
    new += 1 << mutexWaiterShift
}
// 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
// awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功
if awoke { 
    if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
    }
    // 清除唤醒标志位
    new &^= mutexWoken
}

There are two cases to go here: 1. the spin exceeds the number of times; 2. the lock is not currently held.

So the first judgment, if the lock is currently added, but not in a hungry state, will also repeat the setting new |= mutexLocked, that is, the mutexLocked state is set to 1.

If old is already hungry or has been locked, then it is necessary to set Waiter plus one, indicating that this goroutine will not acquire a lock below and will wait.

If starving is true, which means the current goroutine is hungry and old is already locked, then set new |= mutexStarving, which means the mutexStarving status bit is set to 1.

awoke if set successfully at spin time, then new &^= mutexWoken here to eliminate the mutexWoken flag bit. Because the subsequent process is likely to hang the current thread, it needs to wait for other goroutine to release the lock to wake up, if the unlock time found the mutexWoken position is not 0, then it will not go to wake up, then the thread can not wake up again to add locks.

Continuing on.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // 2.到这里是没有获取到锁,判断一下等待时长是否不为0
    // 如果不为0,那么加入到队列头部
    queueLifo := waitStartTime != 0
    // 3.如果等待时间为0,那么初始化等待时间
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    // 4.阻塞等待
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    // 5.唤醒之后检查锁是否应该处于饥饿状态
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    // 6.判断是否已经处于饥饿状态
    if old&mutexStarving != 0 { 
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        // 7.加锁并且将waiter数减1
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 { 
            // 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式
            delta -= mutexStarving
        }
        // 9.设置状态
        atomic.AddInt32(&m.state, delta)
        break
    }
    awoke = true
    iter = 0
} else {
    old = m.state
}

To here, first will CAS set the new state, if set successfully then go down, otherwise return to set the state in a loop afterwards. After successful setting.

  1. first will determine the old state, if there is no hunger, and did not get the lock, then return directly, because this case before entering this code will set the new state to mutexLocked, that has acquired the lock. Here also determine the old state can not be hungry state, otherwise also can not obtain the lock.
  2. determine whether waitStartTime has been initialized, if a new goroutine to seize the lock, then queueLifo will return false; if not a new goroutine to seize the lock, then added to the head of the waiting queue, so that the goroutine waiting the longest priority to obtain the lock.
  3. if the wait time is 0, then initialize the wait time.
  4. blocking the wait and the current goroutine is put to sleep.
  5. check if the lock should be hungry after waking up and set the starving variable value.
  6. determine whether it is already in the hungry state, if not, then it goes directly to the next for loop to get the lock.
  7. lock and the number of waiters minus one, here I looked for a while, did not understand what it means, in fact, you need to understand in two steps, equivalent to state + mutexLocked, and then the state and then the number of waiters part minus one.
  8. if the current goroutine is not hungry or waiter only one, switch from hungry mode to normal mode.
  9. set the state.

The following illustration is used to explain.

This part of the illustration is the operation before hibernation, hibernation will be based on the state of old to determine whether it can directly obtain the lock, if the old state is not locked, nor hungry, then directly break return, because this situation will be set in CAS plus lock;.

Then down to determine whether waitStartTime is equal to 0, if not equal, that is not the first time to come, but was woken up after coming here, then you can not directly to the end of the team and then hibernate, but to the head of the team to prevent a long time to grab the lock.

sobyte

The following diagram is the schematic diagram after waking up, how to be woken up can be directly to the jump to the unlocking part to see the end and then come back.

At the beginning of the wake-up call, we need to determine the current starving state and the waiting time if it exceeds 1ms, then starving will be set to true.

Next there will be an if judgment, there is a detail here, because it is woken up, so you need to reacquire the lock before judgment, if the current is not hungry mode, then it will return directly, and then re-enter the for loop.

If the current is in starvation mode, then it will calculate the delta for locking, and the current goroutine is able to directly seize the lock, so you need to waiter minus one, if starving is not hungry, or waiting time does not exceed 1ms, or waiter only one, then you also need to delta minus mutexStarving, said to exit the starvation mode.

Finally, the state is added to delta by AddInt32. The reason why it can be added directly here is that the mutexLocked value of the state is definitely 0, and the mutexStarving bit is definitely 1, and there is at least one current goroutine in the waiting queue before acquiring the lock, so the waiter can be directly subtracted by 1.

sobyte

unlocking process

fast path

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }
    //返回一个state被减后的值    
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 { 
        //如果返回的state值不为0,那么进入到unlockSlow中
        m.unlockSlow(new)
    }
}

The main thing here is that AddInt32 resets the state’s mutexLocked bit to 0, and then determines whether the new state value is not 0. If it is not 0, the unlockSlow method is called.

unlockSlow

sobyte

The unlockSlow method is also divided into normal mode and starvation mode unlocking.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 正常模式
    if new&mutexStarving == 0 {
        old := new
        for { 
            // 如果没有 waiter,或者已经有在处理的情况,直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            } 
            // waiter 数减 1,mutexWoken 标志设置上,通过 CAS 更新 state 的值
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 直接唤醒等待队列中的 waiter
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else { // 饥饿模式
        // 直接唤醒等待队列中的 waiter
        runtime_Semrelease(&m.sema, true, 1)
    }
}

in normal mode, if there is no waiter, or if one of mutexLocked, mutexStarving, or mutexWoken is not zero indicating that another goroutine is already processing, return directly; if there is a waiter for the mutex lock, then wake up the waiter in the waiting queue directly by runtime_Semrelease.

In starvation mode, the runtime_Semrelease method is called directly to give the current lock to the next waiter that is trying to get the lock, and the waiter will get the lock when it is woken up.

Summary

The design of Mutex is very concise, from the code can be seen in order to design such a simple code state a field can be used as four fields. And in order to solve the goroutine starvation problem, Mutex in 1.9 added the starvation mode to make the lock more fair, unfair waiting time is limited to 1 millisecond, but at the same time, the code has become more and more difficult to understand, so to understand its ideas above need to slowly waste some time to experience it in detail.