lock
感觉lock lab
是自我感觉还算是比较难的lab
了, 第二个练习想了很久才想到如何写出正确的代码,特别是中间又莫名出现了 panic: freeing free blocks
, 找了很长时间都没有找到问题所在, 还好的是最终解决该问题, 将文件系统 fs
删除后就莫名奇妙好了, 所有的 testcase
全部都可以通过了, 后面需要继续研究一下 lock free
队列的实现.
- 本章终点讲述操作系统中
sleep
和wakeup
的实现原理, 感觉还是挺有趣的实现, 当然本质上sleep
的实现原理非常简单, 即将当前线程设置为sleeping
状态, 并标记当前线程的 $chan$, 然后将当前线程切换出去, 线程调度程序在运行时,发现当前线程的状态不是runable
时就不会将该线程调度到cpu
核心上, 从而实现了该线程的睡眠.void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &p->lock){
release(&p->lock);
acquire(lk);
}
} - 我们同时也可以观察一下系统调用程序中
sleep
函数的实现, 原理实际上也是调用sleep lock
来实现, 当前如果等待的时间小于sleep
睡眠的时间,如果当前进程被唤醒,则继续执行sleep
, 否则则释放ticklocks
, 此时进行就可以从调度程序中被唤醒.sleep
系统调用都是标记在ticks
中,我们可以看到时间中断中,每次会将标记为ticks
的进程进行唤醒,从而实现时间计数.uint64
sys_sleep(void)
{
int n;
uint ticks0;
if(argint(0, &n) < 0)
return -1;
acquire(&tickslock);
ticks0 = ticks;
while(ticks - ticks0 < n){
if(myproc()->killed){
release(&tickslock);
return -1;
}
sleep(&ticks, &tickslock);
}
release(&tickslock);
return 0;
}
void
clockintr()
{
acquire(&tickslock);
ticks++;
wakeup(&ticks);
release(&tickslock);
} - 关于
wake
的原理实现也非常简单,即将进程的状态由sleeping
设置为runable
即可, 此时在进行进程调度时,该进程即可就唤醒,加载到CPU
核心中运行.wakeup
时需要传入chan
参数标记.void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
} lecture
中还提到了lost wakeup
的问题, 很多细节问题真心值得深入的去思考这些细节实现. 还是认真的学习视频感觉比较.- 本章的
lab
主要以如何减少lock
冲突的出发点去优化代码, 同时如何避免死锁的问题去寻求解决办法, 总的来说还算是比较有意思的lab
.
实现代码
本次lab
的几个程序倒是感觉不是太难,实现代码倒是不多,但是很多值得思考的地方,整个MIT 6.S081的课程总结全部放到github上了.
git repo
Memory allocator
The program user/kalloctest stresses xv6's memory allocator: three processes grow and shrink their address spaces, resulting in many calls to kalloc and kfree. kalloc and kfree obtain kmem.lock. kalloctest prints (as "#fetch-and-add") the number of loop iterations in acquire due to attempts to acquire a lock that another core already holds, for the kmem lock and a few other locks. The number of loop iterations in acquire is a rough measure of lock contention. The output of kalloctest looks similar to this before you complete the lab: |
- 题目提示由于目前实现的
kalloc
所有的线程都共享一把锁, 此时如何所有的线程在heap
上申请时会造成冲突, 从而都在锁上进行自旋, 从而进行等待,题目要求优化kalloc
,减少lock accquire
时的冲突. 题目也给了思路, 就是为每个线程建立一个memory pool
, 每个线程的内存申请都发生在自身的memory pool
, 当如果自身的memory pool
用完时,则从其他线程的memory pool
中进行steal
操作. - 本身比较简单, 但是需要注意的几个细节:
kalloc
模块对外提供的接口不变, 每次进行alloc
操作时, 首先需要获取当前线程的cpuid
的信息, 可以通过寄存器读取即可获取,xv6
中已经封装好接口cpuid()
,但是调用该接口时,需要关闭中断, 读取完成后, 还需要开启中断.- 进行
steal
操作时, 需要防止死锁. - 初始化时, 可以将所有未使用的内存都挂载在
cpu 0
上.
代码实现
memory
初始化:struct {
struct spinlock lock;
struct run *freelist;
int freesize;
char name[16];
} kmem[NCPU];
void
kinit()
{
for (int i = 0; i < NCPU; ++i) {
snprintf(kmem[i].name, 16, "%s%d", "kmem", i);
initlock(&kmem[i].lock, kmem[i].name);
kmem[i].freelist = 0;
kmem[i].freesize = 0;
} // we wiil alloc each cup heap lock and list
freerange(end, (void*)PHYSTOP);
}kfree
操作:// Free the page of physical memory pointed at by v,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
//add this block to the free list
push_off();
int hart = cpuid();
pop_off();
acquire(&kmem[hart].lock);
r->next = kmem[hart].freelist;
kmem[hart].freelist = r;
release(&kmem[hart].lock);
}kalloc
操作:// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;
//add this block to the free list
push_off();
int hart = cpuid();
pop_off();
acquire(&kmem[hart].lock);
r = kmem[hart].freelist;
if(r) {
kmem[hart].freelist = r->next;
}
release(&kmem[hart].lock);
if(r == 0) {
// we steal memory from other cpus
for (int i = 0; i < NCPU; ++i) {
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r) {
kmem[i].freelist = r->next;
}
release(&kmem[i].lock);
if(r) break;
}
}
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
Buffer cache
This half of the assignment is independent from the first half; you can work on this half (and pass the tests) whether or not you have completed the first half. |
- 这个题目要求实现磁盘的
cache
的优化, 之前所有的设备请求都挂在一个链式的cahce
上.所有针对磁盘的block
请求读写全部共享一把锁, 这就必然会造成锁的请求冲突, 从而影响读写效率, 现在要求对读写的cache buffer
进行优化, 从而降低锁的请求冲突, 从而提升效率.细节实现如下:- 题目中最关键的提示使用
hash table
,对应于不同的block
请求通过hash
映射到不同的buffer
中, 从而降低锁的冲突. 当hash
对应的bucket
用完时, 则需要从别的bucket
中未使用的buffer
中steal
一些.实际采用链式hash
即可, 即每个bucket
存储的是buffer
的list
. - 题目中最需要值得处理的细节是, 即同一个
dev
和block
对应的buffer
具有唯一性, 这样保证不同的线程读取或者写入的blcok
的cache buffer
是同一个, 这样就能保证原子性, 否则则会出现读写不一致的问题, 这点也是本lab
比较难处理的地方. - 具体实现时, 我们需要对
hash table
的每个bucket
进行加锁操作, 当对该bucket
操作时, 则需要获取锁, 操作完成后, 需要释放该锁.但是如何保证每个唯一的dev
和block
对应的唯一性的buffer
,则需要处理稍微麻烦一些, 首先我们查找该block
对应的bucket
是否对应其含有的dev
和block
, 如果已经含有表示其已经进行缓存了, 则直接进行返回查找到的buffer
即可; 否则我们先获取全局锁,再获取该bucket
对应的锁, 再次再该bucket
中进行查找一遍, 是否有存在该block
对应的buffer
缓存, 这样做的目的是为了防止,bucket
释放锁到获取全局锁之间的这段时间中, 是否有其他的线程将该block
对应的buffer
进行了更新, 从而保证block
对应的buffer
的唯一性.我们在所有的bucket
中找到一个最久且未使用的buffer
分配给该block
, 然后将该buffer
移动到该block
对应的bucket
中.这样就完成了steal
的操作. - 我们查找时优先从该
bucket
找到一个未使用的buffer
, 如果未找到则从其他的bucket
中进行查找即可. - 题目中很容易出的问题是会报
panic: freeing free block
的错误, 这个可能是之前的代码错误导致文件系统fs
中的block
存在错误, 解决办法是执行maka clean
操作, 从新编译一遍文件系统fs
即可.
- 题目中最关键的提示使用
代码实现:
从新定义
bcache
的结构, 采用hashtable
的方式存储buf
信息.
extern uint ticks;
struct {
struct buf buf[NBUF];
struct spinlock lock;
// hash table
struct spinlock hashlock[MAX_HASH_BUCKETS];
struct buf hashtable[MAX_HASH_BUCKETS];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
} bcache;初始化
bcache
操作, 此时我们需要初始化每个bucket
对应的锁,并初始化每个bucket
.void
binit(void)
{
struct buf *b;
// init buffer lock and init free list buffer
// initial the hash table and hash lock
for (int i = 0; i < MAX_HASH_BUCKETS; ++i) {
initlock(&bcache.hashlock[i], "hashlock");
// Create linked list of buffers
bcache.hashtable[i].prev = &bcache.hashtable[i];
bcache.hashtable[i].next = &bcache.hashtable[i];
}
initlock(&bcache.lock, "bcache");
for(b = bcache.buf; b < bcache.buf + NBUF; b++){
initsleeplock(&b->lock, "bufferlock");
b->timestamp = 0;
b->dev = -1;
b->blockno = -1;
b->refcnt = 0;
b->next = bcache.hashtable[0].next;
b->prev = &bcache.hashtable[0];
bcache.hashtable[0].next->prev = b;
bcache.hashtable[0].next = b;
}
}查找
block
对应的buffer
时, 操作稍微麻烦一些. 我们首先在bucket
中进行查找, 然后获取全局锁, 从其他的bucket
中的查找一个未使用的buffer
.// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
struct buf * lrub = 0;
int minticks = -1;
int no = bhash(blockno);
acquire(&bcache.hashlock[no]);
// Is the block already cached?
for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.hashlock[no]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.hashlock[no]);
// Not cached.
// Recycle the least recently used (LRU) unused buffer.
acquire(&bcache.lock);
acquire(&bcache.hashlock[no]);
for(b = bcache.hashtable[no].next; b != &bcache.hashtable[no]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
if(b->refcnt == 0 && minticks >= b->timestamp) {
lrub = b;
minticks = b->timestamp;
}
}
if (lrub) {
lrub->dev = dev;
lrub->blockno = blockno;
lrub->valid = 0;
lrub->refcnt = 1;
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&lrub->lock);
return lrub;
}
// we steal a buffer block from other buckets
for (int i = 1; i < MAX_HASH_BUCKETS; ++i) {
int newno = bhash(no + i);
acquire(&bcache.hashlock[newno]);
for (b = bcache.hashtable[newno].prev; b != &bcache.hashtable[newno]; b = b->prev){
if(b->refcnt == 0 && minticks >= b->timestamp) {
lrub = b;
minticks = b->timestamp;
}
}
if (lrub) {
lrub->dev = dev;
lrub->blockno = blockno;
lrub->valid = 0;
lrub->refcnt = 1;
lrub->next->prev = lrub->prev;
lrub->prev->next = lrub->next;
lrub->next = bcache.hashtable[no].next;
lrub->prev = &bcache.hashtable[no];
bcache.hashtable[no].next->prev = lrub;
bcache.hashtable[no].next = lrub;
release(&bcache.hashlock[newno]);
release(&bcache.hashlock[no]);
release(&bcache.lock);
acquiresleep(&lrub->lock);
return lrub;
}
release(&bcache.hashlock[newno]);
}
release(&bcache.hashlock[no]);
release(&bcache.lock);
// we check the the cache again and the one block must atomic
panic("bget: no buffers");
}释放
buffer
时, 则我们需要记录该buffer
的timestamp
,用于lru
算法.// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int no = bhash(b->blockno);
acquire(&bcache.hashlock[no]);
b->refcnt--;
if (b->refcnt == 0) {
b->timestamp = btime();
}
release(&bcache.hashlock[no]);
// bdegub();
}
static int bhash(int blockno) {
return blockno%MAX_HASH_BUCKETS;
}
static int btime(){
return ticks;
}
思考总结
- 通过这一章的学习,其实给了许多可以思考的地方,从本质上思考锁的作用和原理,以及
sleep
和wakeup
的实现原理,非常简洁而又优雅的实现方式. - 下一步计划:
- 完成
option chanllenge
的project
;
- 完成
- 仔细研究一下无锁队列的实现,在
xv6
中添加无锁队列;
- 仔细研究一下无锁队列的实现,在
欢迎关注和打赏,感谢支持!
- 关注我的博客: http://mikemeng.org/
- 关注我的知乎:https://www.zhihu.com/people/da-hua-niu
- 关注我的微信公众号: 公务程序猿
扫描二维码,分享此文章