您的位置 首页 IOT

你了解过Linux内核的的tasklet机制和作业行列?

你了解过Linux内核的的tasklet机制和工作队列?-Tasklet的特点,也是tasklet的精髓就是:tasklet不能休眠,同一个tasklet不能在两个CPU上同时运行,但是不同tasklet可能在不同CPU上同时运行,则需要注意共享数据的保护。

1. Tasklet机制剖析

上面咱们介绍了软中止机制,linux内核为什么还要引进tasklet机制呢?首要原因是软中止的pending标志位也就32位,一般状况是不随意增加软中止处理的。并且内核也没有供给通用的增加软中止的接口。其次内,软中止处理函数要求可重入,需求考虑到竞赛条件比较多,要求比较高的编程技巧。所以内核供给了tasklet这样的一种通用的机制。

其实每次写总结的文章,总是想把细节的东西说了解,所以越写越多。这样做的优点是能真实了解其间的机制。可是,内容太多的一个害处便是莫非回忆,所以,在讲清楚讲详细的一起,我还要把精华总结出来。Tasklet的特色,也是tasklet的精华便是:tasklet不能休眠,同一个tasklet不能在两个CPU上一起运转,可是不同tasklet或许在不同CPU上一起运转,则需求注意同享数据的维护。

首要的数据结构

static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);

staTIc DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

struct tasklet_struct{ struct tasklet_struct *next; unsigned long state; atomic_t count; void (*func)(unsigned long); unsigned long data;};

1

2

3

4

5

6

7

8

struct tasklet_struct

{

struct tasklet_struct *next;

unsigned long state;

atomic_t count;

void (*func)(unsigned long);

unsigned long data;

};

怎么运用tasklet

运用tasklet比较简略,只需求初始化一个tasklet_struct结构体,然后调用tasklet_schedule,就能使用tasklet机制履行初始化的func函数。

static inline void tasklet_schedule(struct tasklet_struct *t){ if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t);}

1

2

3

4

5

staTIc inline void tasklet_schedule(struct tasklet_struct *t)

{

if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))

__tasklet_schedule(t);

}

tasklet_schedule处理进程也比较简略,便是把tasklet_struct结构体挂到tasklet_vec链表或许挂接到tasklet_hi_vec链表上,并调度软中止TASKLET_SOFTIRQ或许HI_SOFTIRQ

void __tasklet_schedule(struct tasklet_struct *t){ unsigned long flags;local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_restore(flags);}EXPORT_SYMBOL(__tasklet_schedule);void __tasklet_hi_schedule(struct tasklet_struct *t){ unsigned long flags; local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_hi_vec).tail = t; __get_cpu_var(tasklet_hi_vec).tail = &(t->next); raise_softirq_irqoff(HI_SOFTIRQ); local_irq_restore(flags);}EXPORT_SYMBOL(__tasklet_hi_schedule);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

void __tasklet_schedule(struct tasklet_struct *t)

{

unsigned long flags;local_irq_save(flags);

t->next = NULL;

*__get_cpu_var(tasklet_vec).tail = t;

__get_cpu_var(tasklet_vec).tail = &(t->next);

raise_softirq_irqoff(TASKLET_SOFTIRQ);

local_irq_restore(flags);

}

EXPORT_SYMBOL(__tasklet_schedule);

void __tasklet_hi_schedule(struct tasklet_struct *t)

{

unsigned long flags;

local_irq_save(flags);

t->next = NULL;

*__get_cpu_var(tasklet_hi_vec).tail = t;

__get_cpu_var(tasklet_hi_vec).tail = &(t->next);

raise_softirq_irqoff(HI_SOFTIRQ);

local_irq_restore(flags);

}

EXPORT_SYMBOL(__tasklet_hi_schedule);

Tasklet履行进程

Tasklet_action在软中止TASKLET_SOFTIRQ被调度到后会被履行,它从tasklet_vec链表中把tasklet_struct结构体都取下来,然后逐一履行。假如t->count的值等于0,阐明这个tasklet在调度之后,被disable掉了,所以会将tasklet结构体从头放回到tasklet_vec链表,并从头调度TASKLET_SOFTIRQ软中止,在之后enable这个tasklet之后从头再履行它。

static void tasklet_action(struct softirq_action *a){ struct tasklet_struct *list;local_irq_disable(); list = __get_cpu_var(tasklet_vec).head; __get_cpu_var(tasklet_vec).head = NULL; __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head; local_irq_enable(); while (list) { struct tasklet_struct *t = list; list = list->next; if (tasklet_trylock(t)) { if (!atomic_read(&t->count)) { if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); t->func(t->data); tasklet_unlock(t); continue; } tasklet_unlock(t); } local_irq_disable(); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

static void tasklet_action(struct softirq_action *a)

{

struct tasklet_struct *list;local_irq_disable();

list = __get_cpu_var(tasklet_vec).head;

__get_cpu_var(tasklet_vec).head = NULL;

__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;

local_irq_enable();

while (list)

{

struct tasklet_struct *t = list;

list = list->next;

if (tasklet_trylock(t))

{

if (!atomic_read(&t->count))

{

if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))

BUG();

t->func(t->data);

tasklet_unlock(t);

continue;

}

tasklet_unlock(t);

}

local_irq_disable();

t->next = NULL;

*__get_cpu_var(tasklet_vec).tail = t;

__get_cpu_var(tasklet_vec).tail = &(t->next);

__raise_softirq_irqoff(TASKLET_SOFTIRQ);

local_irq_enable();

}

}

2. Linux作业行列

前面现已介绍了tasklet机制,有了tasklet机制为什么还要增加作业行列机制呢?我的了解是因为tasklet机制的约束,变形tasklet中的回调函数有许多的约束,比方不能有休眠的操作等等。而是用作业行列机制,需求处理的函数在进程上下文中调用,休眠操作都是答应的。可是作业行列的实时性不如tasklet,选用作业行列的例程或许不能在短时间内被调用履行。

数据结构阐明

首要需求阐明的是workqueue_struct和cpu_workqueue_struct这两个数据结构,创立一个作业行列首要需求创立workqueue_struct,然后能够在每个CPU上创立一个cpu_workqueue_struct办理结构体。

struct cpu_workqueue_struct{ spinlock_t lock; struct list_head worklist; wait_queue_head_t more_work; struct work_struct *current_work; struct workqueue_struct *wq; struct task_struct *thread; int run_depth; /* Detect run_workqueue() recursion depth */} ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */struct workqueue_struct{ struct cpu_workqueue_struct *cpu_wq; struct list_head list; const char *name; int singlethread; int freezeable; /* Freeze threads during suspend */ int rt;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

struct cpu_workqueue_struct

{

spinlock_t lock;

struct list_head worklist;

wait_queue_head_t more_work;

struct work_struct *current_work;

struct workqueue_struct *wq;

struct task_struct *thread;

int run_depth;        /* Detect run_workqueue() recursion depth */

} ____cacheline_aligned;

/*

* The externally visible workqueue abstraction is an array of

* per-CPU workqueues:

*/

struct workqueue_struct

{

struct cpu_workqueue_struct *cpu_wq;

struct list_head list;

const char *name;

int singlethread;

int freezeable;        /* Freeze threads during suspend */

int rt;

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map;

#endif

};

Work_struct表明即将提交的处理的作业。

struct work_struct{ atomic_long_t data;#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */#define WORK_STRUCT_FLAG_MASK (3UL)#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK) struct list_head entry; work_func_t func;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};

1

2

3

4

5

6

7

8

9

10

11

12

struct work_struct

{

atomic_long_t data;

#define WORK_STRUCT_PENDING 0        /* T if work item pending execution */

#define WORK_STRUCT_FLAG_MASK (3UL)

#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)

struct list_head entry;

work_func_t func;

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map;

#endif

};

上面三个数据结构的联系如下图所示

介绍首要数据结构的意图并不是想要把作业行列详细的细节说了解,首要的意图是给咱们一个总的架构的概括。详细的剖析在下面打开。从上面的该模块首要数据结构的联系来看,首要需求剖析如下几个问题:

1. Workqueque是怎样创立的,包含event/0内核进程的创立

2. Work_queue是怎么提交到作业行列的

3. Event/0内核进程怎么处理提交到行列上的作业

Workqueque的创立

首要申请了workqueue_struct结构体内存,cpu_workqueue_struct结构体的内存。然后在init_cpu_workqueue函数中对cpu_workqueue_struct结构体进行初始化。一起调用create_workqueue_thread函数创立处理作业行列的内核进程。

create_workqueue_thread中创立了如下的内核进程

p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);

最终调用start_workqueue_thread发动新创立的进程。

struct workqueue_struct *__create_workqueue_key(const char *name, int singlethread, int freezeable, int rt, struct lock_class_key *key, const char *lock_name){ struct workqueue_struct *wq; struct cpu_workqueue_struct *cwq; int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); if (!wq->cpu_wq) { kfree(wq); return NULL; } wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); wq->singlethread = singlethread; wq->freezeable = freezeable; wq->rt = rt; INIT_LIST_HEAD(&wq->list); if (singlethread) { cwq = init_cpu_workqueue(wq, singlethread_cpu); err = create_workqueue_thread(cwq, singlethread_cpu); start_workqueue_thread(cwq, -1); } else { cpu_maps_update_begin(); /* * We must place this wq on list even if the code below fails. * cpu_down(cpu) can remove cpu from cpu_populated_map before * destroy_workqueue() takes the lock, in that case we leak * cwq[cpu]->thread. */ spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); spin_unlock(&workqueue_lock); /* * We must initialize cwqs for each possible cpu even if we * are going to call destroy_workqueue() finally. Otherwise * cpu_up() can hit the uninitialized cwq once we drop the * lock. */ for_each_possible_cpu(cpu) { cwq = init_cpu_workqueue(wq, cpu); if (err || !cpu_online(cpu)) continue; err = create_workqueue_thread(cwq, cpu); start_workqueue_thread(cwq, cpu); } cpu_maps_update_done(); } if (err) { destroy_workqueue(wq); wq = NULL; } return wq;}EXPORT_SYMBOL_GPL(__create_workqueue_key);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

struct workqueue_struct *__create_workqueue_key(const char *name,

int singlethread,

int freezeable,

int rt,

struct lock_class_key *key,

const char *lock_name)

{

struct workqueue_struct *wq;

struct cpu_workqueue_struct *cwq;

int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

if (!wq->cpu_wq)

{

kfree(wq);

return NULL;

}

wq->name = name;

lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

wq->singlethread = singlethread;

wq->freezeable = freezeable;

wq->rt = rt;

INIT_LIST_HEAD(&wq->list);

if (singlethread)

{

cwq = init_cpu_workqueue(wq, singlethread_cpu);

err = create_workqueue_thread(cwq, singlethread_cpu);

start_workqueue_thread(cwq, -1);

}

else

{

cpu_maps_update_begin();

/*

* We must place this wq on list even if the code below fails.

* cpu_down(cpu) can remove cpu from cpu_populated_map before

* destroy_workqueue() takes the lock, in that case we leak

* cwq[cpu]->thread.

*/

spin_lock(&workqueue_lock);

list_add(&wq->list, &workqueues);

spin_unlock(&workqueue_lock);

/*

* We must initialize cwqs for each possible cpu even if we

* are going to call destroy_workqueue() finally. Otherwise

* cpu_up() can hit the uninitialized cwq once we drop the

* lock.

*/

for_each_possible_cpu(cpu)

{

cwq = init_cpu_workqueue(wq, cpu);

if (err || !cpu_online(cpu))

continue;

err = create_workqueue_thread(cwq, cpu);

start_workqueue_thread(cwq, cpu);

}

cpu_maps_update_done();

}

if (err)

{

destroy_workqueue(wq);

wq = NULL;

}

return wq;

}

EXPORT_SYMBOL_GPL(__create_workqueue_key);

向作业行列中增加作业

Shedule_work 函数向作业行列中增加使命。这个接口比较简略,无非是一些行列操作,不再叙说。

/** * schedule_work – put work task in global workqueue * @work: job to be done * * This puts a job in the kernel-global workqueue. */int schedule_work(struct work_struct *work){ return queue_work(keventd_wq, work);}EXPORT_SYMBOL(schedule_work);

1

2

3

4

5

6

7

8

9

10

11

/**

* schedule_work – put work task in global workqueue

* @work: job to be done

*

* This puts a job in the kernel-global workqueue.

*/

int schedule_work(struct work_struct *work)

{

return queue_work(keventd_wq, work);

}

EXPORT_SYMBOL(schedule_work);

作业行列内核进程的处理进程

在创立作业行列的时分,咱们创立了一个或许多个进程来处理挂到行列上的作业。这个内核进程的首要函数体为worker_thread,这个函数比较有意思的当地便是,自己下降的优先级,阐明worker_thread调度的优先级比较低。在体系负载大大时分,选用作业行列履行的操作或许存在较大的推迟。

就函数的履行流程来说是诚心的简略,仅仅从行列中取出work,从行列中删除去,清除去pending符号,并履行work设置的回调函数。

static int worker_thread(void *__cwq){ struct cpu_workqueue_struct *cwq = __cwq; DEFINE_WAIT(wait);if (cwq->wq->freezeable) set_freezable(); set_user_nice(current, -5); for (;;) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); if (!freezing(current) && !kthread_should_stop() && list_empty(&cwq->worklist)) schedule(); finish_wait(&cwq->more_work, &wait); try_to_freeze(); if (kthread_should_stop()) break; run_workqueue(cwq); } return 0;}static void run_workqueue(struct cpu_workqueue_struct *cwq){ spin_lock_irq(&cwq->lock); cwq->run_depth++; if (cwq->run_depth > 3) { /* morton gets to eat his hat */ printk(“%s: recursion depth exceeded: %dn”, __func__, cwq->run_depth); dump_stack(); } while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); work_func_t f = work->func;#ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct * from inside the function that is called from it, * this we need to take into account for lockdep too. * To avoid bogus “held lock freed” warnings as well * as problems when looking into work->lockdep_map, * make a copy and use that here. */ struct lockdep_map lockdep_map = work->lockdep_map;#endifcwq->current_work = work; list_del_init(cwq->worklist.next); spin_unlock_irq(&cwq->lock); BUG_ON(get_wq_data(work) != cwq); work_clear_pending(work); lock_map_acquire(&cwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); f(work); lock_map_release(&lockdep_map); lock_map_release(&cwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { printk(KERN_ERR “BUG: workqueue leaked lock or atomic: ” “%s/0x%08x/%dn”, current->comm, preempt_count(), task_pid_nr(current)); printk(KERN_ERR ” last function: “); print_symbol(“%sn”, (unsigned long)f); debug_show_held_locks(current); dump_stack(); } spin_lock_irq(&cwq->lock); cwq->current_work = NULL; } cwq->run_depth–; spin_unlock_irq(&cwq->lock);}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

DEFINE_WAIT(wait);if (cwq->wq->freezeable)

set_freezable();

set_user_nice(current, -5);

for (;;)

{

prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);

if (!freezing(current) &&

!kthread_should_stop() &&

list_empty(&cwq->worklist))

schedule();

finish_wait(&cwq->more_work, &wait);

try_to_freeze();

if (kthread_should_stop())

break;

run_workqueue(cwq);

}

return 0;

}

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

spin_lock_irq(&cwq->lock);

cwq->run_depth++;

if (cwq->run_depth > 3)

{

/* morton gets to eat his hat */

printk(“%s: recursion depth exceeded: %dn”,

__func__, cwq->run_depth);

dump_stack();

}

while (!list_empty(&cwq->worklist))

{

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

work_func_t f = work->func;

#ifdef CONFIG_LOCKDEP

/*

* It is permissible to free the struct work_struct

* from inside the function that is called from it,

* this we need to take into account for lockdep too.

* To avoid bogus “held lock freed” warnings as well

* as problems when looking into work->lockdep_map,

* make a copy and use that here.

*/

struct lockdep_map lockdep_map = work->lockdep_map;

#endifcwq->current_work = work;

list_del_init(cwq->worklist.next);

spin_unlock_irq(&cwq->lock);

BUG_ON(get_wq_data(work) != cwq);

work_clear_pending(work);

lock_map_acquire(&cwq->wq->lockdep_map);

lock_map_acquire(&lockdep_map);

f(work);

lock_map_release(&lockdep_map);

lock_map_release(&cwq->wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) > 0))

{

printk(KERN_ERR “BUG: workqueue leaked lock or atomic: “

“%s/0x%08x/%dn”,

current->comm, preempt_count(),

task_pid_nr(current));

printk(KERN_ERR ”    last function: “);

print_symbol(“%sn”, (unsigned long)f);

debug_show_held_locks(current);

dump_stack();

}

spin_lock_irq(&cwq->lock);

cwq->current_work = NULL;

}

cwq->run_depth–;

spin_unlock_irq(&cwq->lock);

}

 

声明:本文内容来自网络转载或用户投稿,文章版权归原作者和原出处所有。文中观点,不代表本站立场。若有侵权请联系本站删除(kf@86ic.com)https://www.86ic.net/yingyong/iot/90189.html

为您推荐

联系我们

联系我们

在线咨询: QQ交谈

邮箱: kf@86ic.com

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部