linux/tools/perf/bench/futex-wake-parallel.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2015 Davidlohr Bueso.
   3 *
   4 * Block a bunch of threads and let parallel waker threads wakeup an
   5 * equal amount of them. The program output reflects the avg latency
   6 * for each individual thread to service its share of work. Ultimately
   7 * it can be used to measure futex_wake() changes.
   8 */
   9
  10/* For the CLR_() macros */
  11#include <pthread.h>
  12
  13#include <signal.h>
  14#include "../util/stat.h"
  15#include <subcmd/parse-options.h>
  16#include <linux/compiler.h>
  17#include <linux/kernel.h>
  18#include <linux/time64.h>
  19#include <errno.h>
  20#include "bench.h"
  21#include "futex.h"
  22
  23#include <err.h>
  24#include <stdlib.h>
  25#include <sys/time.h>
  26
  27struct thread_data {
  28        pthread_t worker;
  29        unsigned int nwoken;
  30        struct timeval runtime;
  31};
  32
  33static unsigned int nwakes = 1;
  34
  35/* all threads will block on the same futex -- hash bucket chaos ;) */
  36static u_int32_t futex = 0;
  37
  38static pthread_t *blocked_worker;
  39static bool done = false, silent = false, fshared = false;
  40static unsigned int nblocked_threads = 0, nwaking_threads = 0;
  41static pthread_mutex_t thread_lock;
  42static pthread_cond_t thread_parent, thread_worker;
  43static struct stats waketime_stats, wakeup_stats;
  44static unsigned int ncpus, threads_starting;
  45static int futex_flag = 0;
  46
  47static const struct option options[] = {
  48        OPT_UINTEGER('t', "threads", &nblocked_threads, "Specify amount of threads"),
  49        OPT_UINTEGER('w', "nwakers", &nwaking_threads, "Specify amount of waking threads"),
  50        OPT_BOOLEAN( 's', "silent",  &silent,   "Silent mode: do not display data/details"),
  51        OPT_BOOLEAN( 'S', "shared",  &fshared,  "Use shared futexes instead of private ones"),
  52        OPT_END()
  53};
  54
  55static const char * const bench_futex_wake_parallel_usage[] = {
  56        "perf bench futex wake-parallel <options>",
  57        NULL
  58};
  59
  60static void *waking_workerfn(void *arg)
  61{
  62        struct thread_data *waker = (struct thread_data *) arg;
  63        struct timeval start, end;
  64
  65        gettimeofday(&start, NULL);
  66
  67        waker->nwoken = futex_wake(&futex, nwakes, futex_flag);
  68        if (waker->nwoken != nwakes)
  69                warnx("couldn't wakeup all tasks (%d/%d)",
  70                      waker->nwoken, nwakes);
  71
  72        gettimeofday(&end, NULL);
  73        timersub(&end, &start, &waker->runtime);
  74
  75        pthread_exit(NULL);
  76        return NULL;
  77}
  78
  79static void wakeup_threads(struct thread_data *td, pthread_attr_t thread_attr)
  80{
  81        unsigned int i;
  82
  83        pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
  84
  85        /* create and block all threads */
  86        for (i = 0; i < nwaking_threads; i++) {
  87                /*
  88                 * Thread creation order will impact per-thread latency
  89                 * as it will affect the order to acquire the hb spinlock.
  90                 * For now let the scheduler decide.
  91                 */
  92                if (pthread_create(&td[i].worker, &thread_attr,
  93                                   waking_workerfn, (void *)&td[i]))
  94                        err(EXIT_FAILURE, "pthread_create");
  95        }
  96
  97        for (i = 0; i < nwaking_threads; i++)
  98                if (pthread_join(td[i].worker, NULL))
  99                        err(EXIT_FAILURE, "pthread_join");
 100}
 101
 102static void *blocked_workerfn(void *arg __maybe_unused)
 103{
 104        pthread_mutex_lock(&thread_lock);
 105        threads_starting--;
 106        if (!threads_starting)
 107                pthread_cond_signal(&thread_parent);
 108        pthread_cond_wait(&thread_worker, &thread_lock);
 109        pthread_mutex_unlock(&thread_lock);
 110
 111        while (1) { /* handle spurious wakeups */
 112                if (futex_wait(&futex, 0, NULL, futex_flag) != EINTR)
 113                        break;
 114        }
 115
 116        pthread_exit(NULL);
 117        return NULL;
 118}
 119
 120static void block_threads(pthread_t *w, pthread_attr_t thread_attr)
 121{
 122        cpu_set_t cpu;
 123        unsigned int i;
 124
 125        threads_starting = nblocked_threads;
 126
 127        /* create and block all threads */
 128        for (i = 0; i < nblocked_threads; i++) {
 129                CPU_ZERO(&cpu);
 130                CPU_SET(i % ncpus, &cpu);
 131
 132                if (pthread_attr_setaffinity_np(&thread_attr, sizeof(cpu_set_t), &cpu))
 133                        err(EXIT_FAILURE, "pthread_attr_setaffinity_np");
 134
 135                if (pthread_create(&w[i], &thread_attr, blocked_workerfn, NULL))
 136                        err(EXIT_FAILURE, "pthread_create");
 137        }
 138}
 139
 140static void print_run(struct thread_data *waking_worker, unsigned int run_num)
 141{
 142        unsigned int i, wakeup_avg;
 143        double waketime_avg, waketime_stddev;
 144        struct stats __waketime_stats, __wakeup_stats;
 145
 146        init_stats(&__wakeup_stats);
 147        init_stats(&__waketime_stats);
 148
 149        for (i = 0; i < nwaking_threads; i++) {
 150                update_stats(&__waketime_stats, waking_worker[i].runtime.tv_usec);
 151                update_stats(&__wakeup_stats, waking_worker[i].nwoken);
 152        }
 153
 154        waketime_avg = avg_stats(&__waketime_stats);
 155        waketime_stddev = stddev_stats(&__waketime_stats);
 156        wakeup_avg = avg_stats(&__wakeup_stats);
 157
 158        printf("[Run %d]: Avg per-thread latency (waking %d/%d threads) "
 159               "in %.4f ms (+-%.2f%%)\n", run_num + 1, wakeup_avg,
 160               nblocked_threads, waketime_avg / USEC_PER_MSEC,
 161               rel_stddev_stats(waketime_stddev, waketime_avg));
 162}
 163
 164static void print_summary(void)
 165{
 166        unsigned int wakeup_avg;
 167        double waketime_avg, waketime_stddev;
 168
 169        waketime_avg = avg_stats(&waketime_stats);
 170        waketime_stddev = stddev_stats(&waketime_stats);
 171        wakeup_avg = avg_stats(&wakeup_stats);
 172
 173        printf("Avg per-thread latency (waking %d/%d threads) in %.4f ms (+-%.2f%%)\n",
 174               wakeup_avg,
 175               nblocked_threads,
 176               waketime_avg / USEC_PER_MSEC,
 177               rel_stddev_stats(waketime_stddev, waketime_avg));
 178}
 179
 180
 181static void do_run_stats(struct thread_data *waking_worker)
 182{
 183        unsigned int i;
 184
 185        for (i = 0; i < nwaking_threads; i++) {
 186                update_stats(&waketime_stats, waking_worker[i].runtime.tv_usec);
 187                update_stats(&wakeup_stats, waking_worker[i].nwoken);
 188        }
 189
 190}
 191
 192static void toggle_done(int sig __maybe_unused,
 193                        siginfo_t *info __maybe_unused,
 194                        void *uc __maybe_unused)
 195{
 196        done = true;
 197}
 198
 199int bench_futex_wake_parallel(int argc, const char **argv,
 200                              const char *prefix __maybe_unused)
 201{
 202        int ret = 0;
 203        unsigned int i, j;
 204        struct sigaction act;
 205        pthread_attr_t thread_attr;
 206        struct thread_data *waking_worker;
 207
 208        argc = parse_options(argc, argv, options,
 209                             bench_futex_wake_parallel_usage, 0);
 210        if (argc) {
 211                usage_with_options(bench_futex_wake_parallel_usage, options);
 212                exit(EXIT_FAILURE);
 213        }
 214
 215        sigfillset(&act.sa_mask);
 216        act.sa_sigaction = toggle_done;
 217        sigaction(SIGINT, &act, NULL);
 218
 219        ncpus = sysconf(_SC_NPROCESSORS_ONLN);
 220        if (!nblocked_threads)
 221                nblocked_threads = ncpus;
 222
 223        /* some sanity checks */
 224        if (nwaking_threads > nblocked_threads || !nwaking_threads)
 225                nwaking_threads = nblocked_threads;
 226
 227        if (nblocked_threads % nwaking_threads)
 228                errx(EXIT_FAILURE, "Must be perfectly divisible");
 229        /*
 230         * Each thread will wakeup nwakes tasks in
 231         * a single futex_wait call.
 232         */
 233        nwakes = nblocked_threads/nwaking_threads;
 234
 235        blocked_worker = calloc(nblocked_threads, sizeof(*blocked_worker));
 236        if (!blocked_worker)
 237                err(EXIT_FAILURE, "calloc");
 238
 239        if (!fshared)
 240                futex_flag = FUTEX_PRIVATE_FLAG;
 241
 242        printf("Run summary [PID %d]: blocking on %d threads (at [%s] "
 243               "futex %p), %d threads waking up %d at a time.\n\n",
 244               getpid(), nblocked_threads, fshared ? "shared":"private",
 245               &futex, nwaking_threads, nwakes);
 246
 247        init_stats(&wakeup_stats);
 248        init_stats(&waketime_stats);
 249
 250        pthread_attr_init(&thread_attr);
 251        pthread_mutex_init(&thread_lock, NULL);
 252        pthread_cond_init(&thread_parent, NULL);
 253        pthread_cond_init(&thread_worker, NULL);
 254
 255        for (j = 0; j < bench_repeat && !done; j++) {
 256                waking_worker = calloc(nwaking_threads, sizeof(*waking_worker));
 257                if (!waking_worker)
 258                        err(EXIT_FAILURE, "calloc");
 259
 260                /* create, launch & block all threads */
 261                block_threads(blocked_worker, thread_attr);
 262
 263                /* make sure all threads are already blocked */
 264                pthread_mutex_lock(&thread_lock);
 265                while (threads_starting)
 266                        pthread_cond_wait(&thread_parent, &thread_lock);
 267                pthread_cond_broadcast(&thread_worker);
 268                pthread_mutex_unlock(&thread_lock);
 269
 270                usleep(100000);
 271
 272                /* Ok, all threads are patiently blocked, start waking folks up */
 273                wakeup_threads(waking_worker, thread_attr);
 274
 275                for (i = 0; i < nblocked_threads; i++) {
 276                        ret = pthread_join(blocked_worker[i], NULL);
 277                        if (ret)
 278                                err(EXIT_FAILURE, "pthread_join");
 279                }
 280
 281                do_run_stats(waking_worker);
 282                if (!silent)
 283                        print_run(waking_worker, j);
 284
 285                free(waking_worker);
 286        }
 287
 288        /* cleanup & report results */
 289        pthread_cond_destroy(&thread_parent);
 290        pthread_cond_destroy(&thread_worker);
 291        pthread_mutex_destroy(&thread_lock);
 292        pthread_attr_destroy(&thread_attr);
 293
 294        print_summary();
 295
 296        free(blocked_worker);
 297        return ret;
 298}
 299