linux/drivers/staging/dst/thread_pool.c
<<
>>
Prefs
   1/*
   2 * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
   3 * All rights reserved.
   4 *
   5 * This program is free software; you can redistribute it and/or modify
   6 * it under the terms of the GNU General Public License as published by
   7 * the Free Software Foundation; either version 2 of the License, or
   8 * (at your option) any later version.
   9 *
  10 * This program is distributed in the hope that it will be useful,
  11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13 * GNU General Public License for more details.
  14 */
  15
  16#include <linux/kernel.h>
  17#include <linux/dst.h>
  18#include <linux/kthread.h>
  19#include <linux/slab.h>
  20
  21/*
  22 * Thread pool abstraction allows to schedule a work to be performed
  23 * on behalf of kernel thread. One does not operate with threads itself,
  24 * instead user provides setup and cleanup callbacks for thread pool itself,
  25 * and action and cleanup callbacks for each submitted work.
  26 *
  27 * Each worker has private data initialized at creation time and data,
  28 * provided by user at scheduling time.
  29 *
  30 * When action is being performed, thread can not be used by other users,
  31 * instead they will sleep until there is free thread to pick their work.
  32 */
  33struct thread_pool_worker
  34{
  35        struct list_head        worker_entry;
  36
  37        struct task_struct      *thread;
  38
  39        struct thread_pool      *pool;
  40
  41        int                     error;
  42        int                     has_data;
  43        int                     need_exit;
  44        unsigned int            id;
  45
  46        wait_queue_head_t       wait;
  47
  48        void                    *private;
  49        void                    *schedule_data;
  50
  51        int                     (* action)(void *private, void *schedule_data);
  52        void                    (* cleanup)(void *private);
  53};
  54
  55static void thread_pool_exit_worker(struct thread_pool_worker *w)
  56{
  57        kthread_stop(w->thread);
  58
  59        w->cleanup(w->private);
  60        kfree(w);
  61}
  62
  63/*
  64 * Called to mark thread as ready and allow users to schedule new work.
  65 */
  66static void thread_pool_worker_make_ready(struct thread_pool_worker *w)
  67{
  68        struct thread_pool *p = w->pool;
  69
  70        mutex_lock(&p->thread_lock);
  71
  72        if (!w->need_exit) {
  73                list_move_tail(&w->worker_entry, &p->ready_list);
  74                w->has_data = 0;
  75                mutex_unlock(&p->thread_lock);
  76
  77                wake_up(&p->wait);
  78        } else {
  79                p->thread_num--;
  80                list_del(&w->worker_entry);
  81                mutex_unlock(&p->thread_lock);
  82
  83                thread_pool_exit_worker(w);
  84        }
  85}
  86
  87/*
  88 * Thread action loop: waits until there is new work.
  89 */
  90static int thread_pool_worker_func(void *data)
  91{
  92        struct thread_pool_worker *w = data;
  93
  94        while (!kthread_should_stop()) {
  95                wait_event_interruptible(w->wait,
  96                        kthread_should_stop() || w->has_data);
  97
  98                if (kthread_should_stop())
  99                        break;
 100
 101                if (!w->has_data)
 102                        continue;
 103
 104                w->action(w->private, w->schedule_data);
 105                thread_pool_worker_make_ready(w);
 106        }
 107
 108        return 0;
 109}
 110
 111/*
 112 * Remove single worker without specifying which one.
 113 */
 114void thread_pool_del_worker(struct thread_pool *p)
 115{
 116        struct thread_pool_worker *w = NULL;
 117
 118        while (!w && p->thread_num) {
 119                wait_event(p->wait, !list_empty(&p->ready_list) || !p->thread_num);
 120
 121                dprintk("%s: locking list_empty: %d, thread_num: %d.\n",
 122                                __func__, list_empty(&p->ready_list), p->thread_num);
 123
 124                mutex_lock(&p->thread_lock);
 125                if (!list_empty(&p->ready_list)) {
 126                        w = list_first_entry(&p->ready_list,
 127                                        struct thread_pool_worker,
 128                                        worker_entry);
 129
 130                        dprintk("%s: deleting w: %p, thread_num: %d, list: %p [%p.%p].\n",
 131                                        __func__, w, p->thread_num, &p->ready_list,
 132                                        p->ready_list.prev, p->ready_list.next);
 133
 134                        p->thread_num--;
 135                        list_del(&w->worker_entry);
 136                }
 137                mutex_unlock(&p->thread_lock);
 138        }
 139
 140        if (w)
 141                thread_pool_exit_worker(w);
 142        dprintk("%s: deleted w: %p, thread_num: %d.\n",
 143                        __func__, w, p->thread_num);
 144}
 145
 146/*
 147 * Remove a worker with given ID.
 148 */
 149void thread_pool_del_worker_id(struct thread_pool *p, unsigned int id)
 150{
 151        struct thread_pool_worker *w;
 152        int found = 0;
 153
 154        mutex_lock(&p->thread_lock);
 155        list_for_each_entry(w, &p->ready_list, worker_entry) {
 156                if (w->id == id) {
 157                        found = 1;
 158                        p->thread_num--;
 159                        list_del(&w->worker_entry);
 160                        break;
 161                }
 162        }
 163
 164        if (!found) {
 165                list_for_each_entry(w, &p->active_list, worker_entry) {
 166                        if (w->id == id) {
 167                                w->need_exit = 1;
 168                                break;
 169                        }
 170                }
 171        }
 172        mutex_unlock(&p->thread_lock);
 173
 174        if (found)
 175                thread_pool_exit_worker(w);
 176}
 177
 178/*
 179 * Add new worker thread with given parameters.
 180 * If initialization callback fails, return error.
 181 */
 182int thread_pool_add_worker(struct thread_pool *p,
 183                char *name,
 184                unsigned int id,
 185                void *(* init)(void *private),
 186                void (* cleanup)(void *private),
 187                void *private)
 188{
 189        struct thread_pool_worker *w;
 190        int err = -ENOMEM;
 191
 192        w = kzalloc(sizeof(struct thread_pool_worker), GFP_KERNEL);
 193        if (!w)
 194                goto err_out_exit;
 195
 196        w->pool = p;
 197        init_waitqueue_head(&w->wait);
 198        w->cleanup = cleanup;
 199        w->id = id;
 200
 201        w->thread = kthread_run(thread_pool_worker_func, w, "%s", name);
 202        if (IS_ERR(w->thread)) {
 203                err = PTR_ERR(w->thread);
 204                goto err_out_free;
 205        }
 206
 207        w->private = init(private);
 208        if (IS_ERR(w->private)) {
 209                err = PTR_ERR(w->private);
 210                goto err_out_stop_thread;
 211        }
 212
 213        mutex_lock(&p->thread_lock);
 214        list_add_tail(&w->worker_entry, &p->ready_list);
 215        p->thread_num++;
 216        mutex_unlock(&p->thread_lock);
 217
 218        return 0;
 219
 220err_out_stop_thread:
 221        kthread_stop(w->thread);
 222err_out_free:
 223        kfree(w);
 224err_out_exit:
 225        return err;
 226}
 227
 228/*
 229 * Destroy the whole pool.
 230 */
 231void thread_pool_destroy(struct thread_pool *p)
 232{
 233        while (p->thread_num) {
 234                dprintk("%s: num: %d.\n", __func__, p->thread_num);
 235                thread_pool_del_worker(p);
 236        }
 237
 238        kfree(p);
 239}
 240
 241/*
 242 * Create a pool with given number of threads.
 243 * They will have sequential IDs started from zero.
 244 */
 245struct thread_pool *thread_pool_create(int num, char *name,
 246                void *(* init)(void *private),
 247                void (* cleanup)(void *private),
 248                void *private)
 249{
 250        struct thread_pool_worker *w, *tmp;
 251        struct thread_pool *p;
 252        int err = -ENOMEM;
 253        int i;
 254
 255        p = kzalloc(sizeof(struct thread_pool), GFP_KERNEL);
 256        if (!p)
 257                goto err_out_exit;
 258
 259        init_waitqueue_head(&p->wait);
 260        mutex_init(&p->thread_lock);
 261        INIT_LIST_HEAD(&p->ready_list);
 262        INIT_LIST_HEAD(&p->active_list);
 263        p->thread_num = 0;
 264
 265        for (i=0; i<num; ++i) {
 266                err = thread_pool_add_worker(p, name, i, init,
 267                                cleanup, private);
 268                if (err)
 269                        goto err_out_free_all;
 270        }
 271
 272        return p;
 273
 274err_out_free_all:
 275        list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) {
 276                list_del(&w->worker_entry);
 277                thread_pool_exit_worker(w);
 278        }
 279        kfree(p);
 280err_out_exit:
 281        return ERR_PTR(err);
 282}
 283
 284/*
 285 * Schedule execution of the action on a given thread,
 286 * provided ID pointer has to match previously stored
 287 * private data.
 288 */
 289int thread_pool_schedule_private(struct thread_pool *p,
 290                int (* setup)(void *private, void *data),
 291                int (* action)(void *private, void *data),
 292                void *data, long timeout, void *id)
 293{
 294        struct thread_pool_worker *w, *tmp, *worker = NULL;
 295        int err = 0;
 296
 297        while (!worker && !err) {
 298                timeout = wait_event_interruptible_timeout(p->wait,
 299                                !list_empty(&p->ready_list),
 300                                timeout);
 301
 302                if (!timeout) {
 303                        err = -ETIMEDOUT;
 304                        break;
 305                }
 306
 307                worker = NULL;
 308                mutex_lock(&p->thread_lock);
 309                list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) {
 310                        if (id && id != w->private)
 311                                continue;
 312
 313                        worker = w;
 314
 315                        list_move_tail(&w->worker_entry, &p->active_list);
 316
 317                        err = setup(w->private, data);
 318                        if (!err) {
 319                                w->schedule_data = data;
 320                                w->action = action;
 321                                w->has_data = 1;
 322                                wake_up(&w->wait);
 323                        } else {
 324                                list_move_tail(&w->worker_entry, &p->ready_list);
 325                        }
 326
 327                        break;
 328                }
 329                mutex_unlock(&p->thread_lock);
 330        }
 331
 332        return err;
 333}
 334
 335/*
 336 * Schedule execution on arbitrary thread from the pool.
 337 */
 338int thread_pool_schedule(struct thread_pool *p,
 339                int (* setup)(void *private, void *data),
 340                int (* action)(void *private, void *data),
 341                void *data, long timeout)
 342{
 343        return thread_pool_schedule_private(p, setup,
 344                        action, data, timeout, NULL);
 345}
 346