首页

    go rwmutex

    标签:go

    执行这段代码

    package main
    
    import (
        "fmt"
        "math/rand"
        "os"
        "os/signal"
        "sync"
        "time"
    )
    
    var (
        mu          sync.RWMutex
        quit        = make(chan os.Signal)
        ch          = make(chan int)
    )
    
    func init() {
        rand.Seed(time.Now().Unix())
    }
    
    func sleep() int {
        r := rand.Intn(1000)
        time.Sleep(time.Duration(r) * time.Millisecond)
        return r
    }
    
    func writer(i int) {
        sleep()
        mu.Lock()
        fmt.Println("writer start", i)
        r := sleep()
        fmt.Println("writer", i, r)
        mu.Unlock()
    }
    
    func reader(i int) {
        sleep()
        mu.RLock()
        fmt.Println("reader start", i)
        r := sleep()
        fmt.Println("reader", i, r)
        mu.RUnlock()
    }
    
    func main() {
        signal.Notify(quit)
        for i := 1; i <= 3; i++ {
            go writer(i)
        }
        for i := 1; i <= 9; i++ {
            go reader(i)
        }
        <-quit
    }
    

    结果可能不同,一种理想的结果

    reader start 2
    reader start 9
    reader 2 122
    reader start 1
    reader start 7
    reader 7 149
    reader start 3
    reader 9 397
    reader 1 536
    reader 3 393
    writer start 1
    writer 1 728
    reader start 6
    reader start 8
    reader start 5
    reader start 4
    reader 4 61
    reader 5 95
    reader 8 960
    reader 6 968
    writer start 3
    writer 3 528
    writer start 2
    writer 2 192
    

    可以看到

    • reader获得读锁后,其他reader也可以获得读锁,writer需等待reader执行完释放读锁后,才可以争取获得锁
    • writer获得写锁后,执行过程中,其他writer不可以获得写锁,reader不可以获得读锁
    • reader执行完释放读锁后,其他reader,writer都可以争取获得锁
    • writer执行完释放写锁后,如果后面有reader,writer同时准备好了,一定是reader获得锁,这是为了避免后面的reader饥饿(starved)

    第3点由RUnlock(),Lock()源码可以看到

    RUnlock()

    func (rw *RWMutex) RUnlock() {
        ...
        if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
            if r+1 == 0 || r+1 == -rwmutexMaxreader {
                race.Enable()
                thrSw("sync: RUnlock of unlocked RWMutex")
            }
            // A writer is pending.
            if atomic.AddInt32(&rw.readerWait, -1) == 0 {
                // The last reader unblocks the writer.       
                runtime_Semrelease(&rw.WriteSem, false)
            }
        }
        ...
    }
    

    Lock()

    func (rw *RWMutex) Lock() {
        ...
        rw.w.Lock()
        r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
        if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
            runtime_SemacquireMutex(&rw.writerSem, false)
        }
        ...
    }
    

    RUnlock()第9行,最后一个reader快完成时,释放rw.writerSem,writer因此可以参与竞态,此时,readerCount小于-1,因为第3行对readerCount减1后,readerCount小于0才会进入if块,进入后r等于-1时抛出异常.
    readerCount小于-1,看下面的RLock(),reader可以进入if块,也可以参与竞态.

    第4点由Unlock(),RLock()源码可以看到

    RLock()

    func (rw *RWMutex) RLock() {
        ...
        if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            runtime_SemacquireMutex(&rw.ReadeSem, false)
        }
        ...
    }
    

    Unlock()

    func (rw *RWMutex) Unlock() {
        ...
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxreader)
        if r >= rwmutexMaxreader {
            race.Enable()
            throw("sync: Unlock of unlocked RWMutex")
        }
        for i := 0; i < int(r); i++ {
            runtime_Semrelease(&rw.readerSem, false)
        }
        rw.w.Unlock()
        ...
    }
    

    readerCount是待处理的reader数量,Unlock()第8-10行,对所有待处理的释放rw.ReadeSem,这时reader可以获得rw.ReadeSem,进而执行后面代码;但是writer由于没有rw.w.Unlock()释放互斥锁,其他writer不能同reader竞争


    不定期更新