linux/drivers/xen/xenbus/xenbus_xs.c
<<
>>
Prefs
   1/******************************************************************************
   2 * xenbus_xs.c
   3 *
   4 * This is the kernel equivalent of the "xs" library.  We don't need everything
   5 * and we use xenbus_comms for communication.
   6 *
   7 * Copyright (C) 2005 Rusty Russell, IBM Corporation
   8 *
   9 * This program is free software; you can redistribute it and/or
  10 * modify it under the terms of the GNU General Public License version 2
  11 * as published by the Free Software Foundation; or, when distributed
  12 * separately from the Linux kernel or incorporated into other
  13 * software packages, subject to the following license:
  14 *
  15 * Permission is hereby granted, free of charge, to any person obtaining a copy
  16 * of this source file (the "Software"), to deal in the Software without
  17 * restriction, including without limitation the rights to use, copy, modify,
  18 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
  19 * and to permit persons to whom the Software is furnished to do so, subject to
  20 * the following conditions:
  21 *
  22 * The above copyright notice and this permission notice shall be included in
  23 * all copies or substantial portions of the Software.
  24 *
  25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  26 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  27 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  28 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  29 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  30 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  31 * IN THE SOFTWARE.
  32 */
  33
  34#include <linux/unistd.h>
  35#include <linux/errno.h>
  36#include <linux/types.h>
  37#include <linux/uio.h>
  38#include <linux/kernel.h>
  39#include <linux/string.h>
  40#include <linux/err.h>
  41#include <linux/slab.h>
  42#include <linux/fcntl.h>
  43#include <linux/kthread.h>
  44#include <linux/rwsem.h>
  45#include <linux/module.h>
  46#include <linux/mutex.h>
  47#include <xen/xenbus.h>
  48#include "xenbus_comms.h"
  49
  50struct xs_stored_msg {
  51        struct list_head list;
  52
  53        struct xsd_sockmsg hdr;
  54
  55        union {
  56                /* Queued replies. */
  57                struct {
  58                        char *body;
  59                } reply;
  60
  61                /* Queued watch events. */
  62                struct {
  63                        struct xenbus_watch *handle;
  64                        char **vec;
  65                        unsigned int vec_size;
  66                } watch;
  67        } u;
  68};
  69
  70struct xs_handle {
  71        /* A list of replies. Currently only one will ever be outstanding. */
  72        struct list_head reply_list;
  73        spinlock_t reply_lock;
  74        wait_queue_head_t reply_waitq;
  75
  76        /*
  77         * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex.
  78         * response_mutex is never taken simultaneously with the other three.
  79         */
  80
  81        /* One request at a time. */
  82        struct mutex request_mutex;
  83
  84        /* Protect xenbus reader thread against save/restore. */
  85        struct mutex response_mutex;
  86
  87        /* Protect transactions against save/restore. */
  88        struct rw_semaphore transaction_mutex;
  89
  90        /* Protect watch (de)register against save/restore. */
  91        struct rw_semaphore watch_mutex;
  92};
  93
  94static struct xs_handle xs_state;
  95
  96/* List of registered watches, and a lock to protect it. */
  97static LIST_HEAD(watches);
  98static DEFINE_SPINLOCK(watches_lock);
  99
 100/* List of pending watch callback events, and a lock to protect it. */
 101static LIST_HEAD(watch_events);
 102static DEFINE_SPINLOCK(watch_events_lock);
 103
 104/*
 105 * Details of the xenwatch callback kernel thread. The thread waits on the
 106 * watch_events_waitq for work to do (queued on watch_events list). When it
 107 * wakes up it acquires the xenwatch_mutex before reading the list and
 108 * carrying out work.
 109 */
 110static pid_t xenwatch_pid;
 111static DEFINE_MUTEX(xenwatch_mutex);
 112static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
 113
 114static int get_error(const char *errorstring)
 115{
 116        unsigned int i;
 117
 118        for (i = 0; strcmp(errorstring, xsd_errors[i].errstring) != 0; i++) {
 119                if (i == ARRAY_SIZE(xsd_errors) - 1) {
 120                        printk(KERN_WARNING
 121                               "XENBUS xen store gave: unknown error %s",
 122                               errorstring);
 123                        return EINVAL;
 124                }
 125        }
 126        return xsd_errors[i].errnum;
 127}
 128
 129static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
 130{
 131        struct xs_stored_msg *msg;
 132        char *body;
 133
 134        spin_lock(&xs_state.reply_lock);
 135
 136        while (list_empty(&xs_state.reply_list)) {
 137                spin_unlock(&xs_state.reply_lock);
 138                /* XXX FIXME: Avoid synchronous wait for response here. */
 139                wait_event(xs_state.reply_waitq,
 140                           !list_empty(&xs_state.reply_list));
 141                spin_lock(&xs_state.reply_lock);
 142        }
 143
 144        msg = list_entry(xs_state.reply_list.next,
 145                         struct xs_stored_msg, list);
 146        list_del(&msg->list);
 147
 148        spin_unlock(&xs_state.reply_lock);
 149
 150        *type = msg->hdr.type;
 151        if (len)
 152                *len = msg->hdr.len;
 153        body = msg->u.reply.body;
 154
 155        kfree(msg);
 156
 157        return body;
 158}
 159
 160void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
 161{
 162        void *ret;
 163        struct xsd_sockmsg req_msg = *msg;
 164        int err;
 165
 166        if (req_msg.type == XS_TRANSACTION_START)
 167                down_read(&xs_state.transaction_mutex);
 168
 169        mutex_lock(&xs_state.request_mutex);
 170
 171        err = xb_write(msg, sizeof(*msg) + msg->len);
 172        if (err) {
 173                msg->type = XS_ERROR;
 174                ret = ERR_PTR(err);
 175        } else
 176                ret = read_reply(&msg->type, &msg->len);
 177
 178        mutex_unlock(&xs_state.request_mutex);
 179
 180        if ((msg->type == XS_TRANSACTION_END) ||
 181            ((req_msg.type == XS_TRANSACTION_START) &&
 182             (msg->type == XS_ERROR)))
 183                up_read(&xs_state.transaction_mutex);
 184
 185        return ret;
 186}
 187EXPORT_SYMBOL(xenbus_dev_request_and_reply);
 188
 189/* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
 190static void *xs_talkv(struct xenbus_transaction t,
 191                      enum xsd_sockmsg_type type,
 192                      const struct kvec *iovec,
 193                      unsigned int num_vecs,
 194                      unsigned int *len)
 195{
 196        struct xsd_sockmsg msg;
 197        void *ret = NULL;
 198        unsigned int i;
 199        int err;
 200
 201        msg.tx_id = t.id;
 202        msg.req_id = 0;
 203        msg.type = type;
 204        msg.len = 0;
 205        for (i = 0; i < num_vecs; i++)
 206                msg.len += iovec[i].iov_len;
 207
 208        mutex_lock(&xs_state.request_mutex);
 209
 210        err = xb_write(&msg, sizeof(msg));
 211        if (err) {
 212                mutex_unlock(&xs_state.request_mutex);
 213                return ERR_PTR(err);
 214        }
 215
 216        for (i = 0; i < num_vecs; i++) {
 217                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
 218                if (err) {
 219                        mutex_unlock(&xs_state.request_mutex);
 220                        return ERR_PTR(err);
 221                }
 222        }
 223
 224        ret = read_reply(&msg.type, len);
 225
 226        mutex_unlock(&xs_state.request_mutex);
 227
 228        if (IS_ERR(ret))
 229                return ret;
 230
 231        if (msg.type == XS_ERROR) {
 232                err = get_error(ret);
 233                kfree(ret);
 234                return ERR_PTR(-err);
 235        }
 236
 237        if (msg.type != type) {
 238                if (printk_ratelimit())
 239                        printk(KERN_WARNING
 240                               "XENBUS unexpected type [%d], expected [%d]\n",
 241                               msg.type, type);
 242                kfree(ret);
 243                return ERR_PTR(-EINVAL);
 244        }
 245        return ret;
 246}
 247
 248/* Simplified version of xs_talkv: single message. */
 249static void *xs_single(struct xenbus_transaction t,
 250                       enum xsd_sockmsg_type type,
 251                       const char *string,
 252                       unsigned int *len)
 253{
 254        struct kvec iovec;
 255
 256        iovec.iov_base = (void *)string;
 257        iovec.iov_len = strlen(string) + 1;
 258        return xs_talkv(t, type, &iovec, 1, len);
 259}
 260
 261/* Many commands only need an ack, don't care what it says. */
 262static int xs_error(char *reply)
 263{
 264        if (IS_ERR(reply))
 265                return PTR_ERR(reply);
 266        kfree(reply);
 267        return 0;
 268}
 269
 270static unsigned int count_strings(const char *strings, unsigned int len)
 271{
 272        unsigned int num;
 273        const char *p;
 274
 275        for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
 276                num++;
 277
 278        return num;
 279}
 280
 281/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
 282static char *join(const char *dir, const char *name)
 283{
 284        char *buffer;
 285
 286        if (strlen(name) == 0)
 287                buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s", dir);
 288        else
 289                buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s/%s", dir, name);
 290        return (!buffer) ? ERR_PTR(-ENOMEM) : buffer;
 291}
 292
 293static char **split(char *strings, unsigned int len, unsigned int *num)
 294{
 295        char *p, **ret;
 296
 297        /* Count the strings. */
 298        *num = count_strings(strings, len);
 299
 300        /* Transfer to one big alloc for easy freeing. */
 301        ret = kmalloc(*num * sizeof(char *) + len, GFP_NOIO | __GFP_HIGH);
 302        if (!ret) {
 303                kfree(strings);
 304                return ERR_PTR(-ENOMEM);
 305        }
 306        memcpy(&ret[*num], strings, len);
 307        kfree(strings);
 308
 309        strings = (char *)&ret[*num];
 310        for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
 311                ret[(*num)++] = p;
 312
 313        return ret;
 314}
 315
 316char **xenbus_directory(struct xenbus_transaction t,
 317                        const char *dir, const char *node, unsigned int *num)
 318{
 319        char *strings, *path;
 320        unsigned int len;
 321
 322        path = join(dir, node);
 323        if (IS_ERR(path))
 324                return (char **)path;
 325
 326        strings = xs_single(t, XS_DIRECTORY, path, &len);
 327        kfree(path);
 328        if (IS_ERR(strings))
 329                return (char **)strings;
 330
 331        return split(strings, len, num);
 332}
 333EXPORT_SYMBOL_GPL(xenbus_directory);
 334
 335/* Check if a path exists. Return 1 if it does. */
 336int xenbus_exists(struct xenbus_transaction t,
 337                  const char *dir, const char *node)
 338{
 339        char **d;
 340        int dir_n;
 341
 342        d = xenbus_directory(t, dir, node, &dir_n);
 343        if (IS_ERR(d))
 344                return 0;
 345        kfree(d);
 346        return 1;
 347}
 348EXPORT_SYMBOL_GPL(xenbus_exists);
 349
 350/* Get the value of a single file.
 351 * Returns a kmalloced value: call free() on it after use.
 352 * len indicates length in bytes.
 353 */
 354void *xenbus_read(struct xenbus_transaction t,
 355                  const char *dir, const char *node, unsigned int *len)
 356{
 357        char *path;
 358        void *ret;
 359
 360        path = join(dir, node);
 361        if (IS_ERR(path))
 362                return (void *)path;
 363
 364        ret = xs_single(t, XS_READ, path, len);
 365        kfree(path);
 366        return ret;
 367}
 368EXPORT_SYMBOL_GPL(xenbus_read);
 369
 370/* Write the value of a single file.
 371 * Returns -err on failure.
 372 */
 373int xenbus_write(struct xenbus_transaction t,
 374                 const char *dir, const char *node, const char *string)
 375{
 376        const char *path;
 377        struct kvec iovec[2];
 378        int ret;
 379
 380        path = join(dir, node);
 381        if (IS_ERR(path))
 382                return PTR_ERR(path);
 383
 384        iovec[0].iov_base = (void *)path;
 385        iovec[0].iov_len = strlen(path) + 1;
 386        iovec[1].iov_base = (void *)string;
 387        iovec[1].iov_len = strlen(string);
 388
 389        ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
 390        kfree(path);
 391        return ret;
 392}
 393EXPORT_SYMBOL_GPL(xenbus_write);
 394
 395/* Create a new directory. */
 396int xenbus_mkdir(struct xenbus_transaction t,
 397                 const char *dir, const char *node)
 398{
 399        char *path;
 400        int ret;
 401
 402        path = join(dir, node);
 403        if (IS_ERR(path))
 404                return PTR_ERR(path);
 405
 406        ret = xs_error(xs_single(t, XS_MKDIR, path, NULL));
 407        kfree(path);
 408        return ret;
 409}
 410EXPORT_SYMBOL_GPL(xenbus_mkdir);
 411
 412/* Destroy a file or directory (directories must be empty). */
 413int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
 414{
 415        char *path;
 416        int ret;
 417
 418        path = join(dir, node);
 419        if (IS_ERR(path))
 420                return PTR_ERR(path);
 421
 422        ret = xs_error(xs_single(t, XS_RM, path, NULL));
 423        kfree(path);
 424        return ret;
 425}
 426EXPORT_SYMBOL_GPL(xenbus_rm);
 427
 428/* Start a transaction: changes by others will not be seen during this
 429 * transaction, and changes will not be visible to others until end.
 430 */
 431int xenbus_transaction_start(struct xenbus_transaction *t)
 432{
 433        char *id_str;
 434
 435        down_read(&xs_state.transaction_mutex);
 436
 437        id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
 438        if (IS_ERR(id_str)) {
 439                up_read(&xs_state.transaction_mutex);
 440                return PTR_ERR(id_str);
 441        }
 442
 443        t->id = simple_strtoul(id_str, NULL, 0);
 444        kfree(id_str);
 445        return 0;
 446}
 447EXPORT_SYMBOL_GPL(xenbus_transaction_start);
 448
 449/* End a transaction.
 450 * If abandon is true, transaction is discarded instead of committed.
 451 */
 452int xenbus_transaction_end(struct xenbus_transaction t, int abort)
 453{
 454        char abortstr[2];
 455        int err;
 456
 457        if (abort)
 458                strcpy(abortstr, "F");
 459        else
 460                strcpy(abortstr, "T");
 461
 462        err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
 463
 464        up_read(&xs_state.transaction_mutex);
 465
 466        return err;
 467}
 468EXPORT_SYMBOL_GPL(xenbus_transaction_end);
 469
 470/* Single read and scanf: returns -errno or num scanned. */
 471int xenbus_scanf(struct xenbus_transaction t,
 472                 const char *dir, const char *node, const char *fmt, ...)
 473{
 474        va_list ap;
 475        int ret;
 476        char *val;
 477
 478        val = xenbus_read(t, dir, node, NULL);
 479        if (IS_ERR(val))
 480                return PTR_ERR(val);
 481
 482        va_start(ap, fmt);
 483        ret = vsscanf(val, fmt, ap);
 484        va_end(ap);
 485        kfree(val);
 486        /* Distinctive errno. */
 487        if (ret == 0)
 488                return -ERANGE;
 489        return ret;
 490}
 491EXPORT_SYMBOL_GPL(xenbus_scanf);
 492
 493/* Single printf and write: returns -errno or 0. */
 494int xenbus_printf(struct xenbus_transaction t,
 495                  const char *dir, const char *node, const char *fmt, ...)
 496{
 497        va_list ap;
 498        int ret;
 499#define PRINTF_BUFFER_SIZE 4096
 500        char *printf_buffer;
 501
 502        printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
 503        if (printf_buffer == NULL)
 504                return -ENOMEM;
 505
 506        va_start(ap, fmt);
 507        ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
 508        va_end(ap);
 509
 510        BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
 511        ret = xenbus_write(t, dir, node, printf_buffer);
 512
 513        kfree(printf_buffer);
 514
 515        return ret;
 516}
 517EXPORT_SYMBOL_GPL(xenbus_printf);
 518
 519/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
 520int xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
 521{
 522        va_list ap;
 523        const char *name;
 524        int ret = 0;
 525
 526        va_start(ap, dir);
 527        while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
 528                const char *fmt = va_arg(ap, char *);
 529                void *result = va_arg(ap, void *);
 530                char *p;
 531
 532                p = xenbus_read(t, dir, name, NULL);
 533                if (IS_ERR(p)) {
 534                        ret = PTR_ERR(p);
 535                        break;
 536                }
 537                if (fmt) {
 538                        if (sscanf(p, fmt, result) == 0)
 539                                ret = -EINVAL;
 540                        kfree(p);
 541                } else
 542                        *(char **)result = p;
 543        }
 544        va_end(ap);
 545        return ret;
 546}
 547EXPORT_SYMBOL_GPL(xenbus_gather);
 548
 549static int xs_watch(const char *path, const char *token)
 550{
 551        struct kvec iov[2];
 552
 553        iov[0].iov_base = (void *)path;
 554        iov[0].iov_len = strlen(path) + 1;
 555        iov[1].iov_base = (void *)token;
 556        iov[1].iov_len = strlen(token) + 1;
 557
 558        return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov,
 559                                 ARRAY_SIZE(iov), NULL));
 560}
 561
 562static int xs_unwatch(const char *path, const char *token)
 563{
 564        struct kvec iov[2];
 565
 566        iov[0].iov_base = (char *)path;
 567        iov[0].iov_len = strlen(path) + 1;
 568        iov[1].iov_base = (char *)token;
 569        iov[1].iov_len = strlen(token) + 1;
 570
 571        return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov,
 572                                 ARRAY_SIZE(iov), NULL));
 573}
 574
 575static struct xenbus_watch *find_watch(const char *token)
 576{
 577        struct xenbus_watch *i, *cmp;
 578
 579        cmp = (void *)simple_strtoul(token, NULL, 16);
 580
 581        list_for_each_entry(i, &watches, list)
 582                if (i == cmp)
 583                        return i;
 584
 585        return NULL;
 586}
 587
 588/* Register callback to watch this node. */
 589int register_xenbus_watch(struct xenbus_watch *watch)
 590{
 591        /* Pointer in ascii is the token. */
 592        char token[sizeof(watch) * 2 + 1];
 593        int err;
 594
 595        sprintf(token, "%lX", (long)watch);
 596
 597        down_read(&xs_state.watch_mutex);
 598
 599        spin_lock(&watches_lock);
 600        BUG_ON(find_watch(token));
 601        list_add(&watch->list, &watches);
 602        spin_unlock(&watches_lock);
 603
 604        err = xs_watch(watch->node, token);
 605
 606        /* Ignore errors due to multiple registration. */
 607        if ((err != 0) && (err != -EEXIST)) {
 608                spin_lock(&watches_lock);
 609                list_del(&watch->list);
 610                spin_unlock(&watches_lock);
 611        }
 612
 613        up_read(&xs_state.watch_mutex);
 614
 615        return err;
 616}
 617EXPORT_SYMBOL_GPL(register_xenbus_watch);
 618
 619void unregister_xenbus_watch(struct xenbus_watch *watch)
 620{
 621        struct xs_stored_msg *msg, *tmp;
 622        char token[sizeof(watch) * 2 + 1];
 623        int err;
 624
 625        sprintf(token, "%lX", (long)watch);
 626
 627        down_read(&xs_state.watch_mutex);
 628
 629        spin_lock(&watches_lock);
 630        BUG_ON(!find_watch(token));
 631        list_del(&watch->list);
 632        spin_unlock(&watches_lock);
 633
 634        err = xs_unwatch(watch->node, token);
 635        if (err)
 636                printk(KERN_WARNING
 637                       "XENBUS Failed to release watch %s: %i\n",
 638                       watch->node, err);
 639
 640        up_read(&xs_state.watch_mutex);
 641
 642        /* Make sure there are no callbacks running currently (unless
 643           its us) */
 644        if (current->pid != xenwatch_pid)
 645                mutex_lock(&xenwatch_mutex);
 646
 647        /* Cancel pending watch events. */
 648        spin_lock(&watch_events_lock);
 649        list_for_each_entry_safe(msg, tmp, &watch_events, list) {
 650                if (msg->u.watch.handle != watch)
 651                        continue;
 652                list_del(&msg->list);
 653                kfree(msg->u.watch.vec);
 654                kfree(msg);
 655        }
 656        spin_unlock(&watch_events_lock);
 657
 658        if (current->pid != xenwatch_pid)
 659                mutex_unlock(&xenwatch_mutex);
 660}
 661EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
 662
 663void xs_suspend(void)
 664{
 665        down_write(&xs_state.transaction_mutex);
 666        down_write(&xs_state.watch_mutex);
 667        mutex_lock(&xs_state.request_mutex);
 668        mutex_lock(&xs_state.response_mutex);
 669}
 670
 671void xs_resume(void)
 672{
 673        struct xenbus_watch *watch;
 674        char token[sizeof(watch) * 2 + 1];
 675
 676        xb_init_comms();
 677
 678        mutex_unlock(&xs_state.response_mutex);
 679        mutex_unlock(&xs_state.request_mutex);
 680        up_write(&xs_state.transaction_mutex);
 681
 682        /* No need for watches_lock: the watch_mutex is sufficient. */
 683        list_for_each_entry(watch, &watches, list) {
 684                sprintf(token, "%lX", (long)watch);
 685                xs_watch(watch->node, token);
 686        }
 687
 688        up_write(&xs_state.watch_mutex);
 689}
 690
 691void xs_suspend_cancel(void)
 692{
 693        mutex_unlock(&xs_state.response_mutex);
 694        mutex_unlock(&xs_state.request_mutex);
 695        up_write(&xs_state.watch_mutex);
 696        up_write(&xs_state.transaction_mutex);
 697}
 698
 699static int xenwatch_thread(void *unused)
 700{
 701        struct list_head *ent;
 702        struct xs_stored_msg *msg;
 703
 704        for (;;) {
 705                wait_event_interruptible(watch_events_waitq,
 706                                         !list_empty(&watch_events));
 707
 708                if (kthread_should_stop())
 709                        break;
 710
 711                mutex_lock(&xenwatch_mutex);
 712
 713                spin_lock(&watch_events_lock);
 714                ent = watch_events.next;
 715                if (ent != &watch_events)
 716                        list_del(ent);
 717                spin_unlock(&watch_events_lock);
 718
 719                if (ent != &watch_events) {
 720                        msg = list_entry(ent, struct xs_stored_msg, list);
 721                        msg->u.watch.handle->callback(
 722                                msg->u.watch.handle,
 723                                (const char **)msg->u.watch.vec,
 724                                msg->u.watch.vec_size);
 725                        kfree(msg->u.watch.vec);
 726                        kfree(msg);
 727                }
 728
 729                mutex_unlock(&xenwatch_mutex);
 730        }
 731
 732        return 0;
 733}
 734
 735static int process_msg(void)
 736{
 737        struct xs_stored_msg *msg;
 738        char *body;
 739        int err;
 740
 741        /*
 742         * We must disallow save/restore while reading a xenstore message.
 743         * A partial read across s/r leaves us out of sync with xenstored.
 744         */
 745        for (;;) {
 746                err = xb_wait_for_data_to_read();
 747                if (err)
 748                        return err;
 749                mutex_lock(&xs_state.response_mutex);
 750                if (xb_data_to_read())
 751                        break;
 752                /* We raced with save/restore: pending data 'disappeared'. */
 753                mutex_unlock(&xs_state.response_mutex);
 754        }
 755
 756
 757        msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH);
 758        if (msg == NULL) {
 759                err = -ENOMEM;
 760                goto out;
 761        }
 762
 763        err = xb_read(&msg->hdr, sizeof(msg->hdr));
 764        if (err) {
 765                kfree(msg);
 766                goto out;
 767        }
 768
 769        body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH);
 770        if (body == NULL) {
 771                kfree(msg);
 772                err = -ENOMEM;
 773                goto out;
 774        }
 775
 776        err = xb_read(body, msg->hdr.len);
 777        if (err) {
 778                kfree(body);
 779                kfree(msg);
 780                goto out;
 781        }
 782        body[msg->hdr.len] = '\0';
 783
 784        if (msg->hdr.type == XS_WATCH_EVENT) {
 785                msg->u.watch.vec = split(body, msg->hdr.len,
 786                                         &msg->u.watch.vec_size);
 787                if (IS_ERR(msg->u.watch.vec)) {
 788                        err = PTR_ERR(msg->u.watch.vec);
 789                        kfree(msg);
 790                        goto out;
 791                }
 792
 793                spin_lock(&watches_lock);
 794                msg->u.watch.handle = find_watch(
 795                        msg->u.watch.vec[XS_WATCH_TOKEN]);
 796                if (msg->u.watch.handle != NULL) {
 797                        spin_lock(&watch_events_lock);
 798                        list_add_tail(&msg->list, &watch_events);
 799                        wake_up(&watch_events_waitq);
 800                        spin_unlock(&watch_events_lock);
 801                } else {
 802                        kfree(msg->u.watch.vec);
 803                        kfree(msg);
 804                }
 805                spin_unlock(&watches_lock);
 806        } else {
 807                msg->u.reply.body = body;
 808                spin_lock(&xs_state.reply_lock);
 809                list_add_tail(&msg->list, &xs_state.reply_list);
 810                spin_unlock(&xs_state.reply_lock);
 811                wake_up(&xs_state.reply_waitq);
 812        }
 813
 814 out:
 815        mutex_unlock(&xs_state.response_mutex);
 816        return err;
 817}
 818
 819static int xenbus_thread(void *unused)
 820{
 821        int err;
 822
 823        for (;;) {
 824                err = process_msg();
 825                if (err)
 826                        printk(KERN_WARNING "XENBUS error %d while reading "
 827                               "message\n", err);
 828                if (kthread_should_stop())
 829                        break;
 830        }
 831
 832        return 0;
 833}
 834
 835int xs_init(void)
 836{
 837        int err;
 838        struct task_struct *task;
 839
 840        INIT_LIST_HEAD(&xs_state.reply_list);
 841        spin_lock_init(&xs_state.reply_lock);
 842        init_waitqueue_head(&xs_state.reply_waitq);
 843
 844        mutex_init(&xs_state.request_mutex);
 845        mutex_init(&xs_state.response_mutex);
 846        init_rwsem(&xs_state.transaction_mutex);
 847        init_rwsem(&xs_state.watch_mutex);
 848
 849        /* Initialize the shared memory rings to talk to xenstored */
 850        err = xb_init_comms();
 851        if (err)
 852                return err;
 853
 854        task = kthread_run(xenwatch_thread, NULL, "xenwatch");
 855        if (IS_ERR(task))
 856                return PTR_ERR(task);
 857        xenwatch_pid = task->pid;
 858
 859        task = kthread_run(xenbus_thread, NULL, "xenbus");
 860        if (IS_ERR(task))
 861                return PTR_ERR(task);
 862
 863        return 0;
 864}
 865