linux/drivers/xen/xenbus/xenbus_xs.c
<<
>>
Prefs
   1/******************************************************************************
   2 * xenbus_xs.c
   3 *
   4 * This is the kernel equivalent of the "xs" library.  We don't need everything
   5 * and we use xenbus_comms for communication.
   6 *
   7 * Copyright (C) 2005 Rusty Russell, IBM Corporation
   8 *
   9 * This program is free software; you can redistribute it and/or
  10 * modify it under the terms of the GNU General Public License version 2
  11 * as published by the Free Software Foundation; or, when distributed
  12 * separately from the Linux kernel or incorporated into other
  13 * software packages, subject to the following license:
  14 *
  15 * Permission is hereby granted, free of charge, to any person obtaining a copy
  16 * of this source file (the "Software"), to deal in the Software without
  17 * restriction, including without limitation the rights to use, copy, modify,
  18 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
  19 * and to permit persons to whom the Software is furnished to do so, subject to
  20 * the following conditions:
  21 *
  22 * The above copyright notice and this permission notice shall be included in
  23 * all copies or substantial portions of the Software.
  24 *
  25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  26 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  27 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  28 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  29 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  30 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  31 * IN THE SOFTWARE.
  32 */
  33
  34#include <linux/unistd.h>
  35#include <linux/errno.h>
  36#include <linux/types.h>
  37#include <linux/uio.h>
  38#include <linux/kernel.h>
  39#include <linux/string.h>
  40#include <linux/err.h>
  41#include <linux/slab.h>
  42#include <linux/fcntl.h>
  43#include <linux/kthread.h>
  44#include <linux/rwsem.h>
  45#include <linux/module.h>
  46#include <linux/mutex.h>
  47#include <xen/xenbus.h>
  48#include <xen/xen.h>
  49#include "xenbus_comms.h"
  50
  51struct xs_stored_msg {
  52        struct list_head list;
  53
  54        struct xsd_sockmsg hdr;
  55
  56        union {
  57                /* Queued replies. */
  58                struct {
  59                        char *body;
  60                } reply;
  61
  62                /* Queued watch events. */
  63                struct {
  64                        struct xenbus_watch *handle;
  65                        char **vec;
  66                        unsigned int vec_size;
  67                } watch;
  68        } u;
  69};
  70
  71struct xs_handle {
  72        /* A list of replies. Currently only one will ever be outstanding. */
  73        struct list_head reply_list;
  74        spinlock_t reply_lock;
  75        wait_queue_head_t reply_waitq;
  76
  77        /*
  78         * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex.
  79         * response_mutex is never taken simultaneously with the other three.
  80         *
  81         * transaction_mutex must be held before incrementing
  82         * transaction_count. The mutex is held when a suspend is in
  83         * progress to prevent new transactions starting.
  84         *
  85         * When decrementing transaction_count to zero the wait queue
  86         * should be woken up, the suspend code waits for count to
  87         * reach zero.
  88         */
  89
  90        /* One request at a time. */
  91        struct mutex request_mutex;
  92
  93        /* Protect xenbus reader thread against save/restore. */
  94        struct mutex response_mutex;
  95
  96        /* Protect transactions against save/restore. */
  97        struct mutex transaction_mutex;
  98        atomic_t transaction_count;
  99        wait_queue_head_t transaction_wq;
 100
 101        /* Protect watch (de)register against save/restore. */
 102        struct rw_semaphore watch_mutex;
 103};
 104
 105static struct xs_handle xs_state;
 106
 107/* List of registered watches, and a lock to protect it. */
 108static LIST_HEAD(watches);
 109static DEFINE_SPINLOCK(watches_lock);
 110
 111/* List of pending watch callback events, and a lock to protect it. */
 112static LIST_HEAD(watch_events);
 113static DEFINE_SPINLOCK(watch_events_lock);
 114
 115/*
 116 * Details of the xenwatch callback kernel thread. The thread waits on the
 117 * watch_events_waitq for work to do (queued on watch_events list). When it
 118 * wakes up it acquires the xenwatch_mutex before reading the list and
 119 * carrying out work.
 120 */
 121static pid_t xenwatch_pid;
 122static DEFINE_MUTEX(xenwatch_mutex);
 123static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
 124
 125static int get_error(const char *errorstring)
 126{
 127        unsigned int i;
 128
 129        for (i = 0; strcmp(errorstring, xsd_errors[i].errstring) != 0; i++) {
 130                if (i == ARRAY_SIZE(xsd_errors) - 1) {
 131                        printk(KERN_WARNING
 132                               "XENBUS xen store gave: unknown error %s",
 133                               errorstring);
 134                        return EINVAL;
 135                }
 136        }
 137        return xsd_errors[i].errnum;
 138}
 139
 140static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
 141{
 142        struct xs_stored_msg *msg;
 143        char *body;
 144
 145        spin_lock(&xs_state.reply_lock);
 146
 147        while (list_empty(&xs_state.reply_list)) {
 148                spin_unlock(&xs_state.reply_lock);
 149                /* XXX FIXME: Avoid synchronous wait for response here. */
 150                wait_event(xs_state.reply_waitq,
 151                           !list_empty(&xs_state.reply_list));
 152                spin_lock(&xs_state.reply_lock);
 153        }
 154
 155        msg = list_entry(xs_state.reply_list.next,
 156                         struct xs_stored_msg, list);
 157        list_del(&msg->list);
 158
 159        spin_unlock(&xs_state.reply_lock);
 160
 161        *type = msg->hdr.type;
 162        if (len)
 163                *len = msg->hdr.len;
 164        body = msg->u.reply.body;
 165
 166        kfree(msg);
 167
 168        return body;
 169}
 170
 171static void transaction_start(void)
 172{
 173        mutex_lock(&xs_state.transaction_mutex);
 174        atomic_inc(&xs_state.transaction_count);
 175        mutex_unlock(&xs_state.transaction_mutex);
 176}
 177
 178static void transaction_end(void)
 179{
 180        if (atomic_dec_and_test(&xs_state.transaction_count))
 181                wake_up(&xs_state.transaction_wq);
 182}
 183
 184static void transaction_suspend(void)
 185{
 186        mutex_lock(&xs_state.transaction_mutex);
 187        wait_event(xs_state.transaction_wq,
 188                   atomic_read(&xs_state.transaction_count) == 0);
 189}
 190
 191static void transaction_resume(void)
 192{
 193        mutex_unlock(&xs_state.transaction_mutex);
 194}
 195
 196void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
 197{
 198        void *ret;
 199        struct xsd_sockmsg req_msg = *msg;
 200        int err;
 201
 202        if (req_msg.type == XS_TRANSACTION_START)
 203                transaction_start();
 204
 205        mutex_lock(&xs_state.request_mutex);
 206
 207        err = xb_write(msg, sizeof(*msg) + msg->len);
 208        if (err) {
 209                msg->type = XS_ERROR;
 210                ret = ERR_PTR(err);
 211        } else
 212                ret = read_reply(&msg->type, &msg->len);
 213
 214        mutex_unlock(&xs_state.request_mutex);
 215
 216        if ((msg->type == XS_TRANSACTION_END) ||
 217            ((req_msg.type == XS_TRANSACTION_START) &&
 218             (msg->type == XS_ERROR)))
 219                transaction_end();
 220
 221        return ret;
 222}
 223EXPORT_SYMBOL(xenbus_dev_request_and_reply);
 224
 225/* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
 226static void *xs_talkv(struct xenbus_transaction t,
 227                      enum xsd_sockmsg_type type,
 228                      const struct kvec *iovec,
 229                      unsigned int num_vecs,
 230                      unsigned int *len)
 231{
 232        struct xsd_sockmsg msg;
 233        void *ret = NULL;
 234        unsigned int i;
 235        int err;
 236
 237        msg.tx_id = t.id;
 238        msg.req_id = 0;
 239        msg.type = type;
 240        msg.len = 0;
 241        for (i = 0; i < num_vecs; i++)
 242                msg.len += iovec[i].iov_len;
 243
 244        mutex_lock(&xs_state.request_mutex);
 245
 246        err = xb_write(&msg, sizeof(msg));
 247        if (err) {
 248                mutex_unlock(&xs_state.request_mutex);
 249                return ERR_PTR(err);
 250        }
 251
 252        for (i = 0; i < num_vecs; i++) {
 253                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
 254                if (err) {
 255                        mutex_unlock(&xs_state.request_mutex);
 256                        return ERR_PTR(err);
 257                }
 258        }
 259
 260        ret = read_reply(&msg.type, len);
 261
 262        mutex_unlock(&xs_state.request_mutex);
 263
 264        if (IS_ERR(ret))
 265                return ret;
 266
 267        if (msg.type == XS_ERROR) {
 268                err = get_error(ret);
 269                kfree(ret);
 270                return ERR_PTR(-err);
 271        }
 272
 273        if (msg.type != type) {
 274                if (printk_ratelimit())
 275                        printk(KERN_WARNING
 276                               "XENBUS unexpected type [%d], expected [%d]\n",
 277                               msg.type, type);
 278                kfree(ret);
 279                return ERR_PTR(-EINVAL);
 280        }
 281        return ret;
 282}
 283
 284/* Simplified version of xs_talkv: single message. */
 285static void *xs_single(struct xenbus_transaction t,
 286                       enum xsd_sockmsg_type type,
 287                       const char *string,
 288                       unsigned int *len)
 289{
 290        struct kvec iovec;
 291
 292        iovec.iov_base = (void *)string;
 293        iovec.iov_len = strlen(string) + 1;
 294        return xs_talkv(t, type, &iovec, 1, len);
 295}
 296
 297/* Many commands only need an ack, don't care what it says. */
 298static int xs_error(char *reply)
 299{
 300        if (IS_ERR(reply))
 301                return PTR_ERR(reply);
 302        kfree(reply);
 303        return 0;
 304}
 305
 306static unsigned int count_strings(const char *strings, unsigned int len)
 307{
 308        unsigned int num;
 309        const char *p;
 310
 311        for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
 312                num++;
 313
 314        return num;
 315}
 316
 317/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
 318static char *join(const char *dir, const char *name)
 319{
 320        char *buffer;
 321
 322        if (strlen(name) == 0)
 323                buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s", dir);
 324        else
 325                buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s/%s", dir, name);
 326        return (!buffer) ? ERR_PTR(-ENOMEM) : buffer;
 327}
 328
 329static char **split(char *strings, unsigned int len, unsigned int *num)
 330{
 331        char *p, **ret;
 332
 333        /* Count the strings. */
 334        *num = count_strings(strings, len);
 335
 336        /* Transfer to one big alloc for easy freeing. */
 337        ret = kmalloc(*num * sizeof(char *) + len, GFP_NOIO | __GFP_HIGH);
 338        if (!ret) {
 339                kfree(strings);
 340                return ERR_PTR(-ENOMEM);
 341        }
 342        memcpy(&ret[*num], strings, len);
 343        kfree(strings);
 344
 345        strings = (char *)&ret[*num];
 346        for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
 347                ret[(*num)++] = p;
 348
 349        return ret;
 350}
 351
 352char **xenbus_directory(struct xenbus_transaction t,
 353                        const char *dir, const char *node, unsigned int *num)
 354{
 355        char *strings, *path;
 356        unsigned int len;
 357
 358        path = join(dir, node);
 359        if (IS_ERR(path))
 360                return (char **)path;
 361
 362        strings = xs_single(t, XS_DIRECTORY, path, &len);
 363        kfree(path);
 364        if (IS_ERR(strings))
 365                return (char **)strings;
 366
 367        return split(strings, len, num);
 368}
 369EXPORT_SYMBOL_GPL(xenbus_directory);
 370
 371/* Check if a path exists. Return 1 if it does. */
 372int xenbus_exists(struct xenbus_transaction t,
 373                  const char *dir, const char *node)
 374{
 375        char **d;
 376        int dir_n;
 377
 378        d = xenbus_directory(t, dir, node, &dir_n);
 379        if (IS_ERR(d))
 380                return 0;
 381        kfree(d);
 382        return 1;
 383}
 384EXPORT_SYMBOL_GPL(xenbus_exists);
 385
 386/* Get the value of a single file.
 387 * Returns a kmalloced value: call free() on it after use.
 388 * len indicates length in bytes.
 389 */
 390void *xenbus_read(struct xenbus_transaction t,
 391                  const char *dir, const char *node, unsigned int *len)
 392{
 393        char *path;
 394        void *ret;
 395
 396        path = join(dir, node);
 397        if (IS_ERR(path))
 398                return (void *)path;
 399
 400        ret = xs_single(t, XS_READ, path, len);
 401        kfree(path);
 402        return ret;
 403}
 404EXPORT_SYMBOL_GPL(xenbus_read);
 405
 406/* Write the value of a single file.
 407 * Returns -err on failure.
 408 */
 409int xenbus_write(struct xenbus_transaction t,
 410                 const char *dir, const char *node, const char *string)
 411{
 412        const char *path;
 413        struct kvec iovec[2];
 414        int ret;
 415
 416        path = join(dir, node);
 417        if (IS_ERR(path))
 418                return PTR_ERR(path);
 419
 420        iovec[0].iov_base = (void *)path;
 421        iovec[0].iov_len = strlen(path) + 1;
 422        iovec[1].iov_base = (void *)string;
 423        iovec[1].iov_len = strlen(string);
 424
 425        ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
 426        kfree(path);
 427        return ret;
 428}
 429EXPORT_SYMBOL_GPL(xenbus_write);
 430
 431/* Create a new directory. */
 432int xenbus_mkdir(struct xenbus_transaction t,
 433                 const char *dir, const char *node)
 434{
 435        char *path;
 436        int ret;
 437
 438        path = join(dir, node);
 439        if (IS_ERR(path))
 440                return PTR_ERR(path);
 441
 442        ret = xs_error(xs_single(t, XS_MKDIR, path, NULL));
 443        kfree(path);
 444        return ret;
 445}
 446EXPORT_SYMBOL_GPL(xenbus_mkdir);
 447
 448/* Destroy a file or directory (directories must be empty). */
 449int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
 450{
 451        char *path;
 452        int ret;
 453
 454        path = join(dir, node);
 455        if (IS_ERR(path))
 456                return PTR_ERR(path);
 457
 458        ret = xs_error(xs_single(t, XS_RM, path, NULL));
 459        kfree(path);
 460        return ret;
 461}
 462EXPORT_SYMBOL_GPL(xenbus_rm);
 463
 464/* Start a transaction: changes by others will not be seen during this
 465 * transaction, and changes will not be visible to others until end.
 466 */
 467int xenbus_transaction_start(struct xenbus_transaction *t)
 468{
 469        char *id_str;
 470
 471        transaction_start();
 472
 473        id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
 474        if (IS_ERR(id_str)) {
 475                transaction_end();
 476                return PTR_ERR(id_str);
 477        }
 478
 479        t->id = simple_strtoul(id_str, NULL, 0);
 480        kfree(id_str);
 481        return 0;
 482}
 483EXPORT_SYMBOL_GPL(xenbus_transaction_start);
 484
 485/* End a transaction.
 486 * If abandon is true, transaction is discarded instead of committed.
 487 */
 488int xenbus_transaction_end(struct xenbus_transaction t, int abort)
 489{
 490        char abortstr[2];
 491        int err;
 492
 493        if (abort)
 494                strcpy(abortstr, "F");
 495        else
 496                strcpy(abortstr, "T");
 497
 498        err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
 499
 500        transaction_end();
 501
 502        return err;
 503}
 504EXPORT_SYMBOL_GPL(xenbus_transaction_end);
 505
 506/* Single read and scanf: returns -errno or num scanned. */
 507int xenbus_scanf(struct xenbus_transaction t,
 508                 const char *dir, const char *node, const char *fmt, ...)
 509{
 510        va_list ap;
 511        int ret;
 512        char *val;
 513
 514        val = xenbus_read(t, dir, node, NULL);
 515        if (IS_ERR(val))
 516                return PTR_ERR(val);
 517
 518        va_start(ap, fmt);
 519        ret = vsscanf(val, fmt, ap);
 520        va_end(ap);
 521        kfree(val);
 522        /* Distinctive errno. */
 523        if (ret == 0)
 524                return -ERANGE;
 525        return ret;
 526}
 527EXPORT_SYMBOL_GPL(xenbus_scanf);
 528
 529/* Single printf and write: returns -errno or 0. */
 530int xenbus_printf(struct xenbus_transaction t,
 531                  const char *dir, const char *node, const char *fmt, ...)
 532{
 533        va_list ap;
 534        int ret;
 535#define PRINTF_BUFFER_SIZE 4096
 536        char *printf_buffer;
 537
 538        printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_NOIO | __GFP_HIGH);
 539        if (printf_buffer == NULL)
 540                return -ENOMEM;
 541
 542        va_start(ap, fmt);
 543        ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
 544        va_end(ap);
 545
 546        BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
 547        ret = xenbus_write(t, dir, node, printf_buffer);
 548
 549        kfree(printf_buffer);
 550
 551        return ret;
 552}
 553EXPORT_SYMBOL_GPL(xenbus_printf);
 554
 555/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
 556int xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
 557{
 558        va_list ap;
 559        const char *name;
 560        int ret = 0;
 561
 562        va_start(ap, dir);
 563        while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
 564                const char *fmt = va_arg(ap, char *);
 565                void *result = va_arg(ap, void *);
 566                char *p;
 567
 568                p = xenbus_read(t, dir, name, NULL);
 569                if (IS_ERR(p)) {
 570                        ret = PTR_ERR(p);
 571                        break;
 572                }
 573                if (fmt) {
 574                        if (sscanf(p, fmt, result) == 0)
 575                                ret = -EINVAL;
 576                        kfree(p);
 577                } else
 578                        *(char **)result = p;
 579        }
 580        va_end(ap);
 581        return ret;
 582}
 583EXPORT_SYMBOL_GPL(xenbus_gather);
 584
 585static int xs_watch(const char *path, const char *token)
 586{
 587        struct kvec iov[2];
 588
 589        iov[0].iov_base = (void *)path;
 590        iov[0].iov_len = strlen(path) + 1;
 591        iov[1].iov_base = (void *)token;
 592        iov[1].iov_len = strlen(token) + 1;
 593
 594        return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov,
 595                                 ARRAY_SIZE(iov), NULL));
 596}
 597
 598static int xs_unwatch(const char *path, const char *token)
 599{
 600        struct kvec iov[2];
 601
 602        iov[0].iov_base = (char *)path;
 603        iov[0].iov_len = strlen(path) + 1;
 604        iov[1].iov_base = (char *)token;
 605        iov[1].iov_len = strlen(token) + 1;
 606
 607        return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov,
 608                                 ARRAY_SIZE(iov), NULL));
 609}
 610
 611static struct xenbus_watch *find_watch(const char *token)
 612{
 613        struct xenbus_watch *i, *cmp;
 614
 615        cmp = (void *)simple_strtoul(token, NULL, 16);
 616
 617        list_for_each_entry(i, &watches, list)
 618                if (i == cmp)
 619                        return i;
 620
 621        return NULL;
 622}
 623
 624/* Register callback to watch this node. */
 625int register_xenbus_watch(struct xenbus_watch *watch)
 626{
 627        /* Pointer in ascii is the token. */
 628        char token[sizeof(watch) * 2 + 1];
 629        int err;
 630
 631        sprintf(token, "%lX", (long)watch);
 632
 633        down_read(&xs_state.watch_mutex);
 634
 635        spin_lock(&watches_lock);
 636        BUG_ON(find_watch(token));
 637        list_add(&watch->list, &watches);
 638        spin_unlock(&watches_lock);
 639
 640        err = xs_watch(watch->node, token);
 641
 642        if (err) {
 643                spin_lock(&watches_lock);
 644                list_del(&watch->list);
 645                spin_unlock(&watches_lock);
 646        }
 647
 648        up_read(&xs_state.watch_mutex);
 649
 650        return err;
 651}
 652EXPORT_SYMBOL_GPL(register_xenbus_watch);
 653
 654void unregister_xenbus_watch(struct xenbus_watch *watch)
 655{
 656        struct xs_stored_msg *msg, *tmp;
 657        char token[sizeof(watch) * 2 + 1];
 658        int err;
 659
 660        sprintf(token, "%lX", (long)watch);
 661
 662        down_read(&xs_state.watch_mutex);
 663
 664        spin_lock(&watches_lock);
 665        BUG_ON(!find_watch(token));
 666        list_del(&watch->list);
 667        spin_unlock(&watches_lock);
 668
 669        err = xs_unwatch(watch->node, token);
 670        if (err)
 671                printk(KERN_WARNING
 672                       "XENBUS Failed to release watch %s: %i\n",
 673                       watch->node, err);
 674
 675        up_read(&xs_state.watch_mutex);
 676
 677        /* Make sure there are no callbacks running currently (unless
 678           its us) */
 679        if (current->pid != xenwatch_pid)
 680                mutex_lock(&xenwatch_mutex);
 681
 682        /* Cancel pending watch events. */
 683        spin_lock(&watch_events_lock);
 684        list_for_each_entry_safe(msg, tmp, &watch_events, list) {
 685                if (msg->u.watch.handle != watch)
 686                        continue;
 687                list_del(&msg->list);
 688                kfree(msg->u.watch.vec);
 689                kfree(msg);
 690        }
 691        spin_unlock(&watch_events_lock);
 692
 693        if (current->pid != xenwatch_pid)
 694                mutex_unlock(&xenwatch_mutex);
 695}
 696EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
 697
 698void xs_suspend(void)
 699{
 700        transaction_suspend();
 701        down_write(&xs_state.watch_mutex);
 702        mutex_lock(&xs_state.request_mutex);
 703        mutex_lock(&xs_state.response_mutex);
 704}
 705
 706void xs_resume(void)
 707{
 708        struct xenbus_watch *watch;
 709        char token[sizeof(watch) * 2 + 1];
 710
 711        xb_init_comms();
 712
 713        mutex_unlock(&xs_state.response_mutex);
 714        mutex_unlock(&xs_state.request_mutex);
 715        transaction_resume();
 716
 717        /* No need for watches_lock: the watch_mutex is sufficient. */
 718        list_for_each_entry(watch, &watches, list) {
 719                sprintf(token, "%lX", (long)watch);
 720                xs_watch(watch->node, token);
 721        }
 722
 723        up_write(&xs_state.watch_mutex);
 724}
 725
 726void xs_suspend_cancel(void)
 727{
 728        mutex_unlock(&xs_state.response_mutex);
 729        mutex_unlock(&xs_state.request_mutex);
 730        up_write(&xs_state.watch_mutex);
 731        mutex_unlock(&xs_state.transaction_mutex);
 732}
 733
 734static int xenwatch_thread(void *unused)
 735{
 736        struct list_head *ent;
 737        struct xs_stored_msg *msg;
 738
 739        for (;;) {
 740                wait_event_interruptible(watch_events_waitq,
 741                                         !list_empty(&watch_events));
 742
 743                if (kthread_should_stop())
 744                        break;
 745
 746                mutex_lock(&xenwatch_mutex);
 747
 748                spin_lock(&watch_events_lock);
 749                ent = watch_events.next;
 750                if (ent != &watch_events)
 751                        list_del(ent);
 752                spin_unlock(&watch_events_lock);
 753
 754                if (ent != &watch_events) {
 755                        msg = list_entry(ent, struct xs_stored_msg, list);
 756                        msg->u.watch.handle->callback(
 757                                msg->u.watch.handle,
 758                                (const char **)msg->u.watch.vec,
 759                                msg->u.watch.vec_size);
 760                        kfree(msg->u.watch.vec);
 761                        kfree(msg);
 762                }
 763
 764                mutex_unlock(&xenwatch_mutex);
 765        }
 766
 767        return 0;
 768}
 769
 770static int process_msg(void)
 771{
 772        struct xs_stored_msg *msg;
 773        char *body;
 774        int err;
 775
 776        /*
 777         * We must disallow save/restore while reading a xenstore message.
 778         * A partial read across s/r leaves us out of sync with xenstored.
 779         */
 780        for (;;) {
 781                err = xb_wait_for_data_to_read();
 782                if (err)
 783                        return err;
 784                mutex_lock(&xs_state.response_mutex);
 785                if (xb_data_to_read())
 786                        break;
 787                /* We raced with save/restore: pending data 'disappeared'. */
 788                mutex_unlock(&xs_state.response_mutex);
 789        }
 790
 791
 792        msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH);
 793        if (msg == NULL) {
 794                err = -ENOMEM;
 795                goto out;
 796        }
 797
 798        err = xb_read(&msg->hdr, sizeof(msg->hdr));
 799        if (err) {
 800                kfree(msg);
 801                goto out;
 802        }
 803
 804        body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH);
 805        if (body == NULL) {
 806                kfree(msg);
 807                err = -ENOMEM;
 808                goto out;
 809        }
 810
 811        err = xb_read(body, msg->hdr.len);
 812        if (err) {
 813                kfree(body);
 814                kfree(msg);
 815                goto out;
 816        }
 817        body[msg->hdr.len] = '\0';
 818
 819        if (msg->hdr.type == XS_WATCH_EVENT) {
 820                msg->u.watch.vec = split(body, msg->hdr.len,
 821                                         &msg->u.watch.vec_size);
 822                if (IS_ERR(msg->u.watch.vec)) {
 823                        err = PTR_ERR(msg->u.watch.vec);
 824                        kfree(msg);
 825                        goto out;
 826                }
 827
 828                spin_lock(&watches_lock);
 829                msg->u.watch.handle = find_watch(
 830                        msg->u.watch.vec[XS_WATCH_TOKEN]);
 831                if (msg->u.watch.handle != NULL) {
 832                        spin_lock(&watch_events_lock);
 833                        list_add_tail(&msg->list, &watch_events);
 834                        wake_up(&watch_events_waitq);
 835                        spin_unlock(&watch_events_lock);
 836                } else {
 837                        kfree(msg->u.watch.vec);
 838                        kfree(msg);
 839                }
 840                spin_unlock(&watches_lock);
 841        } else {
 842                msg->u.reply.body = body;
 843                spin_lock(&xs_state.reply_lock);
 844                list_add_tail(&msg->list, &xs_state.reply_list);
 845                spin_unlock(&xs_state.reply_lock);
 846                wake_up(&xs_state.reply_waitq);
 847        }
 848
 849 out:
 850        mutex_unlock(&xs_state.response_mutex);
 851        return err;
 852}
 853
 854static int xenbus_thread(void *unused)
 855{
 856        int err;
 857
 858        for (;;) {
 859                err = process_msg();
 860                if (err)
 861                        printk(KERN_WARNING "XENBUS error %d while reading "
 862                               "message\n", err);
 863                if (kthread_should_stop())
 864                        break;
 865        }
 866
 867        return 0;
 868}
 869
 870int xs_init(void)
 871{
 872        int err;
 873        struct task_struct *task;
 874
 875        INIT_LIST_HEAD(&xs_state.reply_list);
 876        spin_lock_init(&xs_state.reply_lock);
 877        init_waitqueue_head(&xs_state.reply_waitq);
 878
 879        mutex_init(&xs_state.request_mutex);
 880        mutex_init(&xs_state.response_mutex);
 881        mutex_init(&xs_state.transaction_mutex);
 882        init_rwsem(&xs_state.watch_mutex);
 883        atomic_set(&xs_state.transaction_count, 0);
 884        init_waitqueue_head(&xs_state.transaction_wq);
 885
 886        /* Initialize the shared memory rings to talk to xenstored */
 887        err = xb_init_comms();
 888        if (err)
 889                return err;
 890
 891        task = kthread_run(xenwatch_thread, NULL, "xenwatch");
 892        if (IS_ERR(task))
 893                return PTR_ERR(task);
 894        xenwatch_pid = task->pid;
 895
 896        task = kthread_run(xenbus_thread, NULL, "xenbus");
 897        if (IS_ERR(task))
 898                return PTR_ERR(task);
 899
 900        return 0;
 901}
 902