做资讯类网站百度查一下
接上文:
https://blog.csdn.net/dog250/article/details/108349046
在这篇文章中,我给出了一个拯救panic的方法,其目的更多的是恶作剧性质。但仍然有不足,请看下面代码段中的注释:
void stub_panic(const char *fmt, ...)
{...local_irq_enable();// 这个时候如果current持有自旋锁,那可怎么办???printk("rq:%d %d\n", preempt_count(), irqs_disabled());__set_current_state(TASK_UNINTERRUPTIBLE);schedule();
如果持有自旋锁的task被schedule出去了,由于该task不会再回来了,那么只要另一个task抢锁,系统立马就会死锁。
需求自然就出来了,我能不能在调用schedule之前遍历系统当前所有自旋锁的持锁情况,将自己持有的自旋锁给unlock了呢?
有需求就有方案。当然可以。
大秀hook手艺的时候来了。虽然作为手艺人用手工的方式拼接指令可以实现任何功能,但现在的目标已经不仅仅是秀手艺了,而是实现上述的需求,所以我尽量先使用stap/kprobe,ftrace这些场面还不是太宏大的工具。
需求图示如下:
很显然,直接的思路就是使用kretprobe了,于是我写出了如下的代码:
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/kprobes.h>struct lock_owner {struct raw_spinlock *lock;struct task_struct *owner;struct task_struct *lock_owner;int idx;
};struct lock_owner plane[256];
unsigned long bitmap[4];int lock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{struct lock_owner *owner, **pdata;struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;int i = -1, retry = 10;pdata = (struct lock_owner **)&ri->data;*pdata = NULL;again:i = find_first_zero_bit(&bitmap[0], 256);if (i > 256 || test_and_set_bit(i, bitmap)) {if (retry --)goto again;goto end;}owner->idx = i;owner = &plane[i];owner->lock = lock;owner->owner = current;owner->lock_owner = NULL;*pdata = owner;
end:return 0;
}int lock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{struct lock_owner *owner = (struct lock_owner *)&ri->data[0];if (owner == NULL)return 0;owner->lock_owner = current;return 0;
}static struct kretprobe lock_kretprobe = {.handler = lock_ret_handler,.entry_handler = lock_handler,.data_size = sizeof(struct lock_owner *),.maxactive = 20,
};int unlock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{struct lock_owner *owner, **pdata;struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;int i = -1;pdata = (struct lock_owner **)&ri->data;*pdata = NULL;for (i = 0; i < 256; i++) {owner = &plane[i];if (test_bit(owner->idx, &bitmap[0]) &&owner->owner == current &&owner->lock_owner == current &&owner->lock == lock &&owner->idx == i) {*pdata = owner;break;}}return 0;
}int unlock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{struct lock_owner *owner = (struct lock_owner *)&ri->data[0];if (owner == NULL)return 0;owner->lock = NULL;owner->owner = NULL;owner->lock_owner = NULL;owner->idx = -1;clear_bit(owner->idx, &bitmap[0]);return 0;
}static struct kretprobe unlock_kretprobe = {.handler = unlock_ret_handler,.entry_handler = unlock_handler,.data_size = sizeof(struct lock_owner *),.maxactive = 20,
};static int __init lock_detect_init(void)
{memset((char *)&plane[0], 0, sizeof(plane));memset((char *)&bitmap[0], 0, sizeof(plane));lock_kretprobe.kp.symbol_name = "_raw_spin_lock";register_kretprobe(&lock_kretprobe);unlock_kretprobe.kp.symbol_name = "_raw_spin_unlock";register_kretprobe(&unlock_kretprobe);return 0;
}static void __exit lock_detect_exit(void)
{unregister_kretprobe(&lock_kretprobe);unregister_kretprobe(&unlock_kretprobe);
}module_init(lock_detect_init);
module_exit(lock_detect_exit);
请忽略具体的spinlock统计逻辑,现在仅仅关注框架,我敢说,这个玩法对于一般的wrap函数,简直就是模版:
- 这个框架可以实现几乎一切wrap函数。
所谓的wrap函数其实很简单:
my_function(void *param)
{pre_process(param);orig_function(param);post_process(param);
}
kretprobe的跳转逻辑如下:
然而,它偏偏不能用于_raw_spin_lock/_raw_spin_unlock的wrap!因为会死锁!
- kprobe/kretprobe框架内部使用spinlock来进行同步,在ret_handler执行的时候,它已经持有了该spinlock,而属于kretprobe的ret_handler本身同样也需要该spinlock,因此会死锁。
啦啦啦,这就是为什么我喜欢纯手工活儿的原因了,因为它可控啊!
来来来,试试ftrace,相比于kretprobe而言,它更简单,我觉得它应该没问题,如果再不行,就只能上纯手工艺了。
先来试试框架,下面是一个什么都不做的框架代码:
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>struct wrap_hook {char *name;struct ftrace_ops ops;unsigned long wrap_func;unsigned long entry;
};void (*real_raw_spin_lock)(spinlock_t *lock);// _raw_spin_lock的wrap函数!
void my_raw_spin_lock(spinlock_t *lock)
{real_raw_spin_lock(lock);
}void (*real_raw_spin_unlock)(spinlock_t *lock);// _raw_spin_unlock的wrap函数!
void my_raw_spin_unlock(spinlock_t *lock)
{real_raw_spin_unlock(lock);
}
struct wrap_hook spinlock_hooks[2] = {{.name = "_raw_spin_lock",.wrap_func = (unsigned long)my_raw_spin_lock,},{.name = "_raw_spin_unlock",.wrap_func = (unsigned long)my_raw_spin_unlock,}
};// 该函数是一个汇聚器,起到将逻辑return到new function的目的。
// 采用汇聚器可以避免为两个函数编写两个独立的hook函数,这是设计模式应用的例子。
void stub_func(unsigned long ip, unsigned long parent_ip,struct ftrace_ops *ops, struct pt_regs *regs)
{struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);// 替换返回地址regs->ip = hook->wrap_func;
}static int wrap_spinlock_init(void)
{int i;for (i = 0; i < 2; i++) {spinlock_hooks[i].ops.func = stub_func;spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE;spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);if (i == 0)real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);elsereal_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);register_ftrace_function(&spinlock_hooks[i].ops);}return 0;
}static void wrap_spinlock_exit(void)
{int i;for (i = 0; i < 2; i++) {unregister_ftrace_function(&spinlock_hooks[i].ops);ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);}
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);MODULE_LICENSE("GPL");
当我加载这个模块的时候,系统像什么都没有发生一样平静,而我用crash看_raw_spin_lock/_raw_spin_unlock的时候,显然它们已经被hook了,这意味着,wrap起作用了。
下图展示了ftrace实现wrap的原理:
这个已经和手工做法几乎无异了。可以拿这个ftrace和上面的kretprobe对比一下,感受一下雷同和差异:
- kretprobe一般也是用来probe函数而不是指令,这一点和ftrace一致。
- ftrace机制上更加直接,而kretprobe则更加trick一点。
- 如果要完成一个业务需求,我选择用ftrace,如果要表演,我会照抄一套kretprobe的机制,如果纯自己玩,我选择纯手工艺。
好了,现在把spinlock统计逻辑放进去,代码简单,自己感受:
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>struct wrap_hook {char *name;struct ftrace_ops ops;unsigned long wrap_func;unsigned long entry;
};struct lock_owner {int idx;spinlock_t *lock;struct task_struct *owner;struct task_struct *lock_owner;
};struct lock_owner plane[32768];
unsigned long bitmap[512];void (*real_raw_spin_lock)(spinlock_t *lock);
void (*real_raw_spin_unlock)(spinlock_t *lock);#pragma GCC optimize ("O0")
void my_raw_spin_lock(spinlock_t *lock)
{struct lock_owner *owner = NULL;int i = -1, retry = 0;// 通篇的header和tailer不要有spinlock,因此即便是printk也不能使用,否则会造成嵌套死锁(毕竟把spin_lock/unlock给hook了)。// 因此,下面实现了一个简单的lock机制,使用原子的test_and_set操作。
again:i = find_first_zero_bit(bitmap, 32768);if (test_and_set_bit(i, bitmap)) {if (retry ++ > 10)goto real;goto again;}owner = &plane[i];owner->idx = i;owner->lock = lock;owner->owner = current;owner->lock_owner = NULL;set_bit(i, bitmap);
real:barrier();real_raw_spin_lock(lock);barrier();if (owner)owner->lock_owner = current;
}#if 0
void test()
{struct lock_owner *owner = NULL;int i = -1;int cnt = 4;again:spin_lock(&maplock);i = find_first_zero_bit(bitmap, 32768);set_bit(i, bitmap);spin_unlock(&maplock);owner = &plane[i];printk("i is:%d at :%p\n", i, owner);if (cnt --)goto again;
}
#endif#pragma GCC optimize ("O0")
void my_raw_spin_unlock(spinlock_t *lock)
{struct lock_owner *owner = NULL;int i = -1;real_raw_spin_unlock(lock);barrier();for (i = 0; i < 32768; i++) {// 注意,这里没有原子性保证,因此我的代码无法cover 100%要求原子的场景,可能会统计不准。if (test_bit(i, bitmap) &&plane[i].owner == current &&plane[i].lock_owner == current &&plane[i].lock == lock) {owner = &plane[i];owner->lock = NULL;owner->owner = NULL;owner->lock_owner = NULL;owner->idx = -1;barrier();clear_bit(i, bitmap);break;}}
}#pragma GCC optimize ("O0")
struct wrap_hook spinlock_hooks[2] = {{.name = "_raw_spin_lock",.wrap_func = (unsigned long)my_raw_spin_lock,},{.name = "_raw_spin_unlock",.wrap_func = (unsigned long)my_raw_spin_unlock,}
};void stub_func(unsigned long ip, unsigned long parent_ip,struct ftrace_ops *ops, struct pt_regs *regs)
{struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);regs->ip = hook->wrap_func;
}#pragma GCC optimize ("O0")
static int wrap_spinlock_init(void)
{int i;memset((char *)plane, 0, sizeof(plane));memset((char *)bitmap, 0, sizeof(bitmap));// 打印出位图的地址,以便panic_resched模块使用!printk("plane:%p bitmap:%p\n", &plane[0], &bitmap[0]);
#if 0test();return 0;
#endiffor (i = 0; i < 2; i++) {spinlock_hooks[i].ops.func = stub_func;spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE;spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);if (i == 0)real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + 5);elsereal_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + 5);ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);register_ftrace_function(&spinlock_hooks[i].ops);}return 0;
}static void wrap_spinlock_exit(void)
{int i;struct lock_owner *owner;
#if 0for (i = 0; i < 32768; i++) {owner = &plane[i];if (test_bit(i, bitmap)) {if (owner->lock == NULL || owner->idx != i)continue;printk("[%d] lock:%p owner:%s[%d] lock_owner:%s[%d]\n",owner->idx,owner->lock,owner->owner?owner->owner->comm:"noone",owner->owner?owner->owner->pid:-2,owner->lock_owner?owner->lock_owner->comm:"null",owner->lock_owner?owner->lock_owner->pid:-1);}}
#endiffor (i = 0; i < 2; i++) {unregister_ftrace_function(&spinlock_hooks[i].ops);ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);}
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);MODULE_LICENSE("GPL");
值得注意的是,wrap函数中不能再调用任何会使用spinlock的函数(避免循环嵌套),因此一开始我决定自己用原子原语实现一个自己的自旋锁:
void lock(unsigned long *lock)
{while (test_and_set_bit(0, lock));
}void unlock(unsigned long *lock)
{*lock = 0; // 比clear_bit更帅
}
然而系统很快就卡死了:
- 把系统所有的spinlock操作全部在此处唯一的自旋锁上串行化,不卡死才怪!
于是,只能退而求其次,采用宽松的约束了:
- 实在没有slot了,就不统计该次记录了。
所以,我这个spinlock快照记录机制, 它是不准的。
以上就是一个简单的spinlock快照机制的代码和说明,它可以展示:
- 当前系统中都有哪些task在争抢哪一把spinlock。
- 当前系统中某个spinlock被哪一个task所持有。
这个机制有什么用呢?
回到本文的开头,如果想让panic被schedule出去而不是宕机,我在担心current持有锁怎么办,我希望有一个办法让我知道current是否持有spinlock以及持有了哪些spinlock,然后将它们unlock。
现在有办法了:
// 通过lock_detect模块init函数中printk出来的bitmap地址来设置。
unsigned long pbitmap;
module_param(pbitmap, ulong, 0644);// 通过lock_detect模块init函数中printk出来的plane地址来设置。
unsigned long pplane;
module_param(pplane, ulong, 0644);void stub_panic(const char *fmt, ...)
{int i;unsigned long *bitmap = (unsigned long *)pbitmap;struct lock_owner *plane = (struct lock_owner *)pplane;// 循环遍历所有当前系统当事的spinlock,解锁current所持有的spinlock。for (i = 0; i < 32768; i++) {if (test_bit(i, bitmap) &&plane[i].owner == current &&plane[i].lock_owner == current &&plane[i].lock &&plane[i].idx == i) {printk("lock hold:%p %s %d\n",plane[i].lock,plane[i].lock_owner?plane[i].lock_owner->comm:"aabb",plane[i].lock_owner?plane[i].lock_owner->pid:-1);spin_unlock(plane[i].lock);}}if (preempt_count())return;local_irq_enable();// 安全地退场!!__set_current_state(TASK_UNINTERRUPTIBLE);schedule();
}
...
强调一点,本文介绍的把戏无法揪出所有的spinlock状态,因为Linux内核spinlock的lock/unlock操作并非_raw_spin_lock/_raw_spin_unlock入口的,比如spin_lock_irqsave/spin_unlock_irqrestore就不使用_raw_spin_lock/_raw_spin_unlock入口,它们有自己的入口。因此你需要把所有这些入口都给ftrace hook了,才能全咯。
不过,这仅仅是一个把戏,何必去留这么多TODO,权当玩吧。
浙江温州皮鞋湿,下雨进水不会胖。