且听疯吟

【MIT6.S081】 lab8 lock

2022-11-20

lock

感觉lock lab是自我感觉还算是比较难的lab了, 第二个练习想了很久才想到如何写出正确的代码,特别是中间又莫名出现了 panic: freeing free blocks, 找了很长时间都没有找到问题所在, 还好的是最终解决该问题, 将文件系统 fs 删除后就莫名奇妙好了, 所有的 testcase 全部都可以通过了, 后面需要继续研究一下 lock free 队列的实现.

  • 本章终点讲述操作系统中 sleepwakeup 的实现原理, 感觉还是挺有趣的实现, 当然本质上 sleep的实现原理非常简单, 即将当前线程设置为sleeping状态, 并标记当前线程的 $chan$, 然后将当前线程切换出去, 线程调度程序在运行时,发现当前线程的状态不是 runable时就不会将该线程调度到 cpu 核心上, 从而实现了该线程的睡眠.
    void
    sleep(void *chan, struct spinlock *lk)
    {
    struct proc *p = myproc();

    // Must acquire p->lock in order to
    // change p->state and then call sched.
    // Once we hold p->lock, we can be
    // guaranteed that we won't miss any wakeup
    // (wakeup locks p->lock),
    // so it's okay to release lk.
    if(lk != &p->lock){ //DOC: sleeplock0
    acquire(&p->lock); //DOC: sleeplock1
    release(lk);
    }

    // Go to sleep.
    p->chan = chan;
    p->state = SLEEPING;

    sched();

    // Tidy up.
    p->chan = 0;

    // Reacquire original lock.
    if(lk != &p->lock){
    release(&p->lock);
    acquire(lk);
    }
    }
  • 我们同时也可以观察一下系统调用程序中 sleep 函数的实现, 原理实际上也是调用 sleep lock来实现, 当前如果等待的时间小于 sleep 睡眠的时间,如果当前进程被唤醒,则继续执行 sleep, 否则则释放 ticklocks, 此时进行就可以从调度程序中被唤醒. sleep 系统调用都是标记在ticks中,我们可以看到时间中断中,每次会将标记为ticks的进程进行唤醒,从而实现时间计数.
    uint64
    sys_sleep(void)
    {
    int n;
    uint ticks0;

    if(argint(0, &n) < 0)
    return -1;
    acquire(&tickslock);
    ticks0 = ticks;
    while(ticks - ticks0 < n){
    if(myproc()->killed){
    release(&tickslock);
    return -1;
    }
    sleep(&ticks, &tickslock);
    }
    release(&tickslock);
    return 0;
    }

    void
    clockintr()
    {
    acquire(&tickslock);
    ticks++;
    wakeup(&ticks);
    release(&tickslock);
    }
  • 关于wake的原理实现也非常简单,即将进程的状态由sleeping设置为 runable即可, 此时在进行进程调度时,该进程即可就唤醒,加载到 CPU核心中运行.wakeup时需要传入 chan 参数标记.
    void
    wakeup(void *chan)
    {
    struct proc *p;

    for(p = proc; p < &proc[NPROC]; p++) {
    acquire(&p->lock);
    if(p->state == SLEEPING && p->chan == chan) {
    p->state = RUNNABLE;
    }
    release(&p->lock);
    }
    }
  • lecture中还提到了lost wakeup的问题, 很多细节问题真心值得深入的去思考这些细节实现. 还是认真的学习视频感觉比较.
  • 本章的lab主要以如何减少lock冲突的出发点去优化代码, 同时如何避免死锁的问题去寻求解决办法, 总的来说还算是比较有意思的lab.

实现代码

本次lab的几个程序倒是感觉不是太难,实现代码倒是不多,但是很多值得思考的地方,整个MIT 6.S081的课程总结全部放到github上了.
git repo

Memory allocator

The program user/kalloctest stresses xv6's memory allocator: three processes grow and shrink their address spaces, resulting in many calls to kalloc and kfree. kalloc and kfree obtain kmem.lock. kalloctest prints (as "#fetch-and-add") the number of loop iterations in acquire due to attempts to acquire a lock that another core already holds, for the kmem lock and a few other locks. The number of loop iterations in acquire is a rough measure of lock contention. The output of kalloctest looks similar to this before you complete the lab:

$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL
acquire maintains, for each lock, the count of calls to acquire for that lock, and the number of times the loop in acquire tried but failed to set the lock. kalloctest calls a system call that causes the kernel to print those counts for the kmem and bcache locks (which are the focus of this lab) and for the 5 most contended locks. If there is lock contention the number of acquire loop iterations will be large. The system call returns the sum of the number of loop iterations for the kmem and bcache locks.

For this lab, you must use a dedicated unloaded machine with multiple cores. If you use a machine that is doing other things, the counts that kalloctest prints will be nonsense. You can use a dedicated Athena workstation, or your own laptop, but don't use a dialup machine.

The root cause of lock contention in kalloctest is that kalloc() has a single free list, protected by a single lock. To remove lock contention, you will have to redesign the memory allocator to avoid a single lock and list. The basic idea is to maintain a free list per CPU, each list with its own lock. Allocations and frees on different CPUs can run in parallel, because each CPU will operate on a different list. The main challenge will be to deal with the case in which one CPU's free list is empty, but another CPU's list has free memory; in that case, the one CPU must "steal" part of the other CPU's free list. Stealing may introduce lock contention, but that will hopefully be infrequent.
  • 题目提示由于目前实现的kalloc所有的线程都共享一把锁, 此时如何所有的线程在heap上申请时会造成冲突, 从而都在锁上进行自旋, 从而进行等待,题目要求优化kalloc,减少lock accquire时的冲突. 题目也给了思路, 就是为每个线程建立一个memory pool, 每个线程的内存申请都发生在自身的memory pool, 当如果自身的memory pool用完时,则从其他线程的memory pool中进行steal操作.
  • 本身比较简单, 但是需要注意的几个细节:
    • kalloc模块对外提供的接口不变, 每次进行alloc操作时, 首先需要获取当前线程的cpuid的信息, 可以通过寄存器读取即可获取,xv6中已经封装好接口cpuid(),但是调用该接口时,需要关闭中断, 读取完成后, 还需要开启中断.
    • 进行steal操作时, 需要防止死锁.
    • 初始化时, 可以将所有未使用的内存都挂载在cpu 0上.

代码实现

  • memory初始化:
    struct {
    struct spinlock lock;
    struct run *freelist;
    int freesize;
    char name[16];
    } kmem[NCPU];

    void
    kinit()
    {
    for (int i = 0; i < NCPU; ++i) {
    snprintf(kmem[i].name, 16, "%s%d", "kmem", i);
    initlock(&kmem[i].lock, kmem[i].name);
    kmem[i].freelist = 0;
    kmem[i].freesize = 0;
    } // we wiil alloc each cup heap lock and list
    freerange(end, (void*)PHYSTOP);
    }
  • kfree操作:
    // Free the page of physical memory pointed at by v,
    // which normally should have been returned by a
    // call to kalloc(). (The exception is when
    // initializing the allocator; see kinit above.)
    void
    kfree(void *pa)
    {
    struct run *r;

    if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

    // Fill with junk to catch dangling refs.
    memset(pa, 1, PGSIZE);
    r = (struct run*)pa;

    //add this block to the free list
    push_off();
    int hart = cpuid();
    pop_off();

    acquire(&kmem[hart].lock);
    r->next = kmem[hart].freelist;
    kmem[hart].freelist = r;
    release(&kmem[hart].lock);
    }
  • kalloc操作:
    // Allocate one 4096-byte page of physical memory.
    // Returns a pointer that the kernel can use.
    // Returns 0 if the memory cannot be allocated.
    void *
    kalloc(void)
    {
    struct run *r;

    //add this block to the free list
    push_off();
    int hart = cpuid();
    pop_off();

    acquire(&kmem[hart].lock);
    r = kmem[hart].freelist;
    if(r) {
    kmem[hart].freelist = r->next;
    }
    release(&kmem[hart].lock);
    if(r == 0) {
    // we steal memory from other cpus
    for (int i = 0; i < NCPU; ++i) {
    acquire(&kmem[i].lock);
    r = kmem[i].freelist;
    if(r) {
    kmem[i].freelist = r->next;
    }
    release(&kmem[i].lock);
    if(r) break;
    }
    }

    if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
    return (void*)r;
    }

Buffer cache

This half of the assignment is independent from the first half; you can work on this half (and pass the tests) whether or not you have completed the first half.

If multiple processes use the file system intensively, they will likely contend for bcache.lock, which protects the disk block cache in kernel/bio.c. bcachetest creates several processes that repeatedly read different files in order to generate contention on bcache.lock; its output looks like this (before you complete this lab):

$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 33035
lock: bcache: #fetch-and-add 16142 #acquire() 65978
--- top 5 contended locks:
lock: virtio_disk: #fetch-and-add 162870 #acquire() 1188
lock: proc: #fetch-and-add 51936 #acquire() 73732
lock: bcache: #fetch-and-add 16142 #acquire() 65978
lock: uart: #fetch-and-add 7505 #acquire() 117
lock: proc: #fetch-and-add 6937 #acquire() 73420
tot= 16142
test0: FAIL
start test1
test1 OK
You will likely see different output, but the number of acquire loop iterations for the bcache lock will be high. If you look at the code in kernel/bio.c, you'll see that bcache.lock protects the list of cached block buffers, the reference count (b->refcnt) in each block buffer, and the identities of the cached blocks (b->dev and b->blockno).
  • 这个题目要求实现磁盘的cache的优化, 之前所有的设备请求都挂在一个链式的cahce上.所有针对磁盘的block请求读写全部共享一把锁, 这就必然会造成锁的请求冲突, 从而影响读写效率, 现在要求对读写的cache buffer进行优化, 从而降低锁的请求冲突, 从而提升效率.细节实现如下:
    • 题目中最关键的提示使用hash table,对应于不同的block请求通过hash映射到不同的buffer中, 从而降低锁的冲突. 当hash对应的bucket用完时, 则需要从别的bucket中未使用的buffersteal一些.实际采用链式hash即可, 即每个bucket存储的是bufferlist.
    • 题目中最需要值得处理的细节是, 即同一个devblock对应的buffer具有唯一性, 这样保证不同的线程读取或者写入的blcokcache buffer是同一个, 这样就能保证原子性, 否则则会出现读写不一致的问题, 这点也是本lab比较难处理的地方.
    • 具体实现时, 我们需要对hash table的每个bucket进行加锁操作, 当对该bucket操作时, 则需要获取锁, 操作完成后, 需要释放该锁.但是如何保证每个唯一的devblock对应的唯一性的buffer,则需要处理稍微麻烦一些, 首先我们查找该block对应的bucket是否对应其含有的devblock, 如果已经含有表示其已经进行缓存了, 则直接进行返回查找到的buffer即可; 否则我们先获取全局锁,再获取该bucket对应的锁, 再次再该bucket中进行查找一遍, 是否有存在该block对应的buffer缓存, 这样做的目的是为了防止, bucket释放锁到获取全局锁之间的这段时间中, 是否有其他的线程将该block对应的buffer进行了更新, 从而保证block对应的buffer的唯一性.我们在所有的bucket中找到一个最久且未使用的buffer分配给该block, 然后将该buffer移动到该block对应的bucket中.这样就完成了steal的操作.
    • 我们查找时优先从该bucket找到一个未使用的buffer, 如果未找到则从其他的bucket中进行查找即可.
    • 题目中很容易出的问题是会报panic: freeing free block的错误, 这个可能是之前的代码错误导致文件系统fs中的block存在错误, 解决办法是执行maka clean操作, 从新编译一遍文件系统fs即可.

代码实现:

  • 从新定义bcache的结构, 采用hashtable的方式存储buf信息.

    #define MAX_HASH_BUCKETS 13
    #define NULL 0
    extern uint ticks;

    struct {
    struct buf buf[NBUF];
    struct spinlock lock;
    // hash table
    struct spinlock hashlock[MAX_HASH_BUCKETS];
    struct buf hashtable[MAX_HASH_BUCKETS];
    // Linked list of all buffers, through prev/next.
    // Sorted by how recently the buffer was used.
    // head.next is most recent, head.prev is least.
    } bcache;
  • 初始化bcache操作, 此时我们需要初始化每个bucket对应的锁,并初始化每个bucket.

    void
    binit(void)
    {
    struct buf *b;
    // init buffer lock and init free list buffer
    // initial the hash table and hash lock
    for (int i = 0; i < MAX_HASH_BUCKETS; ++i) {
    initlock(&bcache.hashlock[i], "hashlock");
    // Create linked list of buffers
    bcache.hashtable[i].prev = &bcache.hashtable[i];
    bcache.hashtable[i].next = &bcache.hashtable[i];
    }
    initlock(&bcache.lock, "bcache");
    for(b = bcache.buf; b < bcache.buf + NBUF; b++){
    initsleeplock(&b->lock, "bufferlock");
    b->timestamp = 0;
    b->dev = -1;
    b->blockno = -1;
    b->refcnt = 0;
    b->next = bcache.hashtable[0].next;
    b->prev = &bcache.hashtable[0];
    bcache.hashtable[0].next->prev = b;
    bcache.hashtable[0].next = b;
    }
    }
  • 查找block对应的buffer时, 操作稍微麻烦一些. 我们首先在bucket中进行查找, 然后获取全局锁, 从其他的bucket中的查找一个未使用的buffer.

    // Look through buffer cache for block on device dev.
    // If not found, allocate a buffer.
    // In either case, return locked buffer.
    static struct buf*
    bget(uint dev, uint blockno)
    {
    struct buf *b;
    struct buf * lrub = 0;
    int minticks = -1;
    int no = bhash(blockno);

    acquire(&bcache.hashlock[no]);
    // Is the block already cached?
    for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
    b->refcnt++;
    release(&bcache.hashlock[no]);
    acquiresleep(&b->lock);
    return b;
    }
    }
    release(&bcache.hashlock[no]);

    // Not cached.
    // Recycle the least recently used (LRU) unused buffer.
    acquire(&bcache.lock);
    acquire(&bcache.hashlock[no]);
    for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
    b->refcnt++;
    release(&bcache.hashlock[no]);
    release(&bcache.lock);
    acquiresleep(&b->lock);
    return b;
    }
    if(b->refcnt == 0 && minticks >= b->timestamp) {
    lrub = b;
    minticks = b->timestamp;
    }
    }
    if (lrub) {
    lrub->dev = dev;
    lrub->blockno = blockno;
    lrub->valid = 0;
    lrub->refcnt = 1;
    release(&bcache.hashlock[no]);
    release(&bcache.lock);
    acquiresleep(&lrub->lock);
    return lrub;
    }

    // we steal a buffer block from other buckets
    for (int i = 1; i < MAX_HASH_BUCKETS; ++i) {
    int newno = bhash(no + i);
    acquire(&bcache.hashlock[newno]);
    for (b = bcache.hashtable[newno].prev; b != &bcache.hashtable[newno]; b = b->prev){
    if(b->refcnt == 0 && minticks >= b->timestamp) {
    lrub = b;
    minticks = b->timestamp;
    }
    }
    if (lrub) {
    lrub->dev = dev;
    lrub->blockno = blockno;
    lrub->valid = 0;
    lrub->refcnt = 1;
    lrub->next->prev = lrub->prev;
    lrub->prev->next = lrub->next;
    lrub->next = bcache.hashtable[no].next;
    lrub->prev = &bcache.hashtable[no];
    bcache.hashtable[no].next->prev = lrub;
    bcache.hashtable[no].next = lrub;
    release(&bcache.hashlock[newno]);
    release(&bcache.hashlock[no]);
    release(&bcache.lock);
    acquiresleep(&lrub->lock);
    return lrub;
    }
    release(&bcache.hashlock[newno]);
    }
    release(&bcache.hashlock[no]);
    release(&bcache.lock);

    // we check the the cache again and the one block must atomic
    panic("bget: no buffers");
    }
  • 释放buffer时, 则我们需要记录该buffertimestamp,用于lru算法.

    // Release a locked buffer.
    // Move to the head of the most-recently-used list.
    void
    brelse(struct buf *b)
    {
    if(!holdingsleep(&b->lock))
    panic("brelse");

    releasesleep(&b->lock);
    int no = bhash(b->blockno);
    acquire(&bcache.hashlock[no]);
    b->refcnt--;
    if (b->refcnt == 0) {
    b->timestamp = btime();
    }
    release(&bcache.hashlock[no]);
    // bdegub();
    }

    static int bhash(int blockno) {
    return blockno%MAX_HASH_BUCKETS;
    }

    static int btime(){
    return ticks;
    }

思考总结

  • 通过这一章的学习,其实给了许多可以思考的地方,从本质上思考锁的作用和原理,以及sleepwakeup的实现原理,非常简洁而又优雅的实现方式.
  • 下一步计划:
    • 完成option chanllengeproject;
    • 仔细研究一下无锁队列的实现,在xv6中添加无锁队列;

欢迎关注和打赏,感谢支持!

扫描二维码,分享此文章