lock
感觉lock lab是自我感觉还算是比较难的lab了, 第二个练习想了很久才想到如何写出正确的代码,特别是中间又莫名出现了 panic: freeing free blocks, 找了很长时间都没有找到问题所在, 还好的是最终解决该问题, 将文件系统 fs 删除后就莫名奇妙好了, 所有的 testcase 全部都可以通过了, 后面需要继续研究一下 lock free 队列的实现.
- 本章终点讲述操作系统中
sleep和wakeup的实现原理, 感觉还是挺有趣的实现, 当然本质上sleep的实现原理非常简单, 即将当前线程设置为sleeping状态, 并标记当前线程的 $chan$, 然后将当前线程切换出去, 线程调度程序在运行时,发现当前线程的状态不是runable时就不会将该线程调度到cpu核心上, 从而实现了该线程的睡眠.void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &p->lock){
release(&p->lock);
acquire(lk);
}
} - 我们同时也可以观察一下系统调用程序中
sleep函数的实现, 原理实际上也是调用sleep lock来实现, 当前如果等待的时间小于sleep睡眠的时间,如果当前进程被唤醒,则继续执行sleep, 否则则释放ticklocks, 此时进行就可以从调度程序中被唤醒.sleep系统调用都是标记在ticks中,我们可以看到时间中断中,每次会将标记为ticks的进程进行唤醒,从而实现时间计数.uint64
sys_sleep(void)
{
int n;
uint ticks0;
if(argint(0, &n) < 0)
return -1;
acquire(&tickslock);
ticks0 = ticks;
while(ticks - ticks0 < n){
if(myproc()->killed){
release(&tickslock);
return -1;
}
sleep(&ticks, &tickslock);
}
release(&tickslock);
return 0;
}
void
clockintr()
{
acquire(&tickslock);
ticks++;
wakeup(&ticks);
release(&tickslock);
} - 关于
wake的原理实现也非常简单,即将进程的状态由sleeping设置为runable即可, 此时在进行进程调度时,该进程即可就唤醒,加载到CPU核心中运行.wakeup时需要传入chan参数标记.void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
} lecture中还提到了lost wakeup的问题, 很多细节问题真心值得深入的去思考这些细节实现. 还是认真的学习视频感觉比较.- 本章的
lab主要以如何减少lock冲突的出发点去优化代码, 同时如何避免死锁的问题去寻求解决办法, 总的来说还算是比较有意思的lab.
实现代码
本次lab的几个程序倒是感觉不是太难,实现代码倒是不多,但是很多值得思考的地方,整个MIT 6.S081的课程总结全部放到github上了.
git repo
Memory allocator
The program user/kalloctest stresses xv6's memory allocator: three processes grow and shrink their address spaces, resulting in many calls to kalloc and kfree. kalloc and kfree obtain kmem.lock. kalloctest prints (as "#fetch-and-add") the number of loop iterations in acquire due to attempts to acquire a lock that another core already holds, for the kmem lock and a few other locks. The number of loop iterations in acquire is a rough measure of lock contention. The output of kalloctest looks similar to this before you complete the lab: |
- 题目提示由于目前实现的
kalloc所有的线程都共享一把锁, 此时如何所有的线程在heap上申请时会造成冲突, 从而都在锁上进行自旋, 从而进行等待,题目要求优化kalloc,减少lock accquire时的冲突. 题目也给了思路, 就是为每个线程建立一个memory pool, 每个线程的内存申请都发生在自身的memory pool, 当如果自身的memory pool用完时,则从其他线程的memory pool中进行steal操作. - 本身比较简单, 但是需要注意的几个细节:
kalloc模块对外提供的接口不变, 每次进行alloc操作时, 首先需要获取当前线程的cpuid的信息, 可以通过寄存器读取即可获取,xv6中已经封装好接口cpuid(),但是调用该接口时,需要关闭中断, 读取完成后, 还需要开启中断.- 进行
steal操作时, 需要防止死锁. - 初始化时, 可以将所有未使用的内存都挂载在
cpu 0上.
代码实现
memory初始化:struct {
struct spinlock lock;
struct run *freelist;
int freesize;
char name[16];
} kmem[NCPU];
void
kinit()
{
for (int i = 0; i < NCPU; ++i) {
snprintf(kmem[i].name, 16, "%s%d", "kmem", i);
initlock(&kmem[i].lock, kmem[i].name);
kmem[i].freelist = 0;
kmem[i].freesize = 0;
} // we wiil alloc each cup heap lock and list
freerange(end, (void*)PHYSTOP);
}kfree操作:// Free the page of physical memory pointed at by v,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
//add this block to the free list
push_off();
int hart = cpuid();
pop_off();
acquire(&kmem[hart].lock);
r->next = kmem[hart].freelist;
kmem[hart].freelist = r;
release(&kmem[hart].lock);
}kalloc操作:// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;
//add this block to the free list
push_off();
int hart = cpuid();
pop_off();
acquire(&kmem[hart].lock);
r = kmem[hart].freelist;
if(r) {
kmem[hart].freelist = r->next;
}
release(&kmem[hart].lock);
if(r == 0) {
// we steal memory from other cpus
for (int i = 0; i < NCPU; ++i) {
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r) {
kmem[i].freelist = r->next;
}
release(&kmem[i].lock);
if(r) break;
}
}
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
Buffer cache
This half of the assignment is independent from the first half; you can work on this half (and pass the tests) whether or not you have completed the first half. |
- 这个题目要求实现磁盘的
cache的优化, 之前所有的设备请求都挂在一个链式的cahce上.所有针对磁盘的block请求读写全部共享一把锁, 这就必然会造成锁的请求冲突, 从而影响读写效率, 现在要求对读写的cache buffer进行优化, 从而降低锁的请求冲突, 从而提升效率.细节实现如下:- 题目中最关键的提示使用
hash table,对应于不同的block请求通过hash映射到不同的buffer中, 从而降低锁的冲突. 当hash对应的bucket用完时, 则需要从别的bucket中未使用的buffer中steal一些.实际采用链式hash即可, 即每个bucket存储的是buffer的list. - 题目中最需要值得处理的细节是, 即同一个
dev和block对应的buffer具有唯一性, 这样保证不同的线程读取或者写入的blcok的cache buffer是同一个, 这样就能保证原子性, 否则则会出现读写不一致的问题, 这点也是本lab比较难处理的地方. - 具体实现时, 我们需要对
hash table的每个bucket进行加锁操作, 当对该bucket操作时, 则需要获取锁, 操作完成后, 需要释放该锁.但是如何保证每个唯一的dev和block对应的唯一性的buffer,则需要处理稍微麻烦一些, 首先我们查找该block对应的bucket是否对应其含有的dev和block, 如果已经含有表示其已经进行缓存了, 则直接进行返回查找到的buffer即可; 否则我们先获取全局锁,再获取该bucket对应的锁, 再次再该bucket中进行查找一遍, 是否有存在该block对应的buffer缓存, 这样做的目的是为了防止,bucket释放锁到获取全局锁之间的这段时间中, 是否有其他的线程将该block对应的buffer进行了更新, 从而保证block对应的buffer的唯一性.我们在所有的bucket中找到一个最久且未使用的buffer分配给该block, 然后将该buffer移动到该block对应的bucket中.这样就完成了steal的操作. - 我们查找时优先从该
bucket找到一个未使用的buffer, 如果未找到则从其他的bucket中进行查找即可. - 题目中很容易出的问题是会报
panic: freeing free block的错误, 这个可能是之前的代码错误导致文件系统fs中的block存在错误, 解决办法是执行maka clean操作, 从新编译一遍文件系统fs即可.
- 题目中最关键的提示使用
代码实现:
从新定义
bcache的结构, 采用hashtable的方式存储buf信息.
extern uint ticks;
struct {
struct buf buf[NBUF];
struct spinlock lock;
// hash table
struct spinlock hashlock[MAX_HASH_BUCKETS];
struct buf hashtable[MAX_HASH_BUCKETS];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
} bcache;初始化
bcache操作, 此时我们需要初始化每个bucket对应的锁,并初始化每个bucket.void
binit(void)
{
struct buf *b;
// init buffer lock and init free list buffer
// initial the hash table and hash lock
for (int i = 0; i < MAX_HASH_BUCKETS; ++i) {
initlock(&bcache.hashlock[i], "hashlock");
// Create linked list of buffers
bcache.hashtable[i].prev = &bcache.hashtable[i];
bcache.hashtable[i].next = &bcache.hashtable[i];
}
initlock(&bcache.lock, "bcache");
for(b = bcache.buf; b < bcache.buf + NBUF; b++){
initsleeplock(&b->lock, "bufferlock");
b->timestamp = 0;
b->dev = -1;
b->blockno = -1;
b->refcnt = 0;
b->next = bcache.hashtable[0].next;
b->prev = &bcache.hashtable[0];
bcache.hashtable[0].next->prev = b;
bcache.hashtable[0].next = b;
}
}查找
block对应的buffer时, 操作稍微麻烦一些. 我们首先在bucket中进行查找, 然后获取全局锁, 从其他的bucket中的查找一个未使用的buffer.// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
struct buf * lrub = 0;
int minticks = -1;
int no = bhash(blockno);
acquire(&bcache.hashlock[no]);
// Is the block already cached?
for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.hashlock[no]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.hashlock[no]);
// Not cached.
// Recycle the least recently used (LRU) unused buffer.
acquire(&bcache.lock);
acquire(&bcache.hashlock[no]);
for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
if(b->refcnt == 0 && minticks >= b->timestamp) {
lrub = b;
minticks = b->timestamp;
}
}
if (lrub) {
lrub->dev = dev;
lrub->blockno = blockno;
lrub->valid = 0;
lrub->refcnt = 1;
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&lrub->lock);
return lrub;
}
// we steal a buffer block from other buckets
for (int i = 1; i < MAX_HASH_BUCKETS; ++i) {
int newno = bhash(no + i);
acquire(&bcache.hashlock[newno]);
for (b = bcache.hashtable[newno].prev; b != &bcache.hashtable[newno]; b = b->prev){
if(b->refcnt == 0 && minticks >= b->timestamp) {
lrub = b;
minticks = b->timestamp;
}
}
if (lrub) {
lrub->dev = dev;
lrub->blockno = blockno;
lrub->valid = 0;
lrub->refcnt = 1;
lrub->next->prev = lrub->prev;
lrub->prev->next = lrub->next;
lrub->next = bcache.hashtable[no].next;
lrub->prev = &bcache.hashtable[no];
bcache.hashtable[no].next->prev = lrub;
bcache.hashtable[no].next = lrub;
release(&bcache.hashlock[newno]);
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&lrub->lock);
return lrub;
}
release(&bcache.hashlock[newno]);
}
release(&bcache.hashlock[no]);
release(&bcache.lock);
// we check the the cache again and the one block must atomic
panic("bget: no buffers");
}释放
buffer时, 则我们需要记录该buffer的timestamp,用于lru算法.// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int no = bhash(b->blockno);
acquire(&bcache.hashlock[no]);
b->refcnt--;
if (b->refcnt == 0) {
b->timestamp = btime();
}
release(&bcache.hashlock[no]);
// bdegub();
}
static int bhash(int blockno) {
return blockno%MAX_HASH_BUCKETS;
}
static int btime(){
return ticks;
}
思考总结
- 通过这一章的学习,其实给了许多可以思考的地方,从本质上思考锁的作用和原理,以及
sleep和wakeup的实现原理,非常简洁而又优雅的实现方式. - 下一步计划:
- 完成
option chanllenge的project;
- 完成
- 仔细研究一下无锁队列的实现,在
xv6中添加无锁队列;
- 仔细研究一下无锁队列的实现,在
欢迎关注和打赏,感谢支持!
- 关注我的博客: http://mikemeng.org/
- 关注我的知乎:https://www.zhihu.com/people/da-hua-niu
- 关注我的微信公众号: 公务程序猿

扫描二维码,分享此文章