linux/drivers/misc/sgi-xp/xpc_uv.c
<<
>>
Prefs
   1/*
   2 * This file is subject to the terms and conditions of the GNU General Public
   3 * License.  See the file "COPYING" in the main directory of this archive
   4 * for more details.
   5 *
   6 * Copyright (c) 2008-2009 Silicon Graphics, Inc.  All Rights Reserved.
   7 */
   8
   9/*
  10 * Cross Partition Communication (XPC) uv-based functions.
  11 *
  12 *     Architecture specific implementation of common functions.
  13 *
  14 */
  15
  16#include <linux/kernel.h>
  17#include <linux/mm.h>
  18#include <linux/interrupt.h>
  19#include <linux/delay.h>
  20#include <linux/device.h>
  21#include <linux/cpu.h>
  22#include <linux/module.h>
  23#include <linux/err.h>
  24#include <linux/slab.h>
  25#include <asm/uv/uv_hub.h>
  26#if defined CONFIG_X86_64
  27#include <asm/uv/bios.h>
  28#include <asm/uv/uv_irq.h>
  29#elif defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
  30#include <asm/sn/intr.h>
  31#include <asm/sn/sn_sal.h>
  32#endif
  33#include "../sgi-gru/gru.h"
  34#include "../sgi-gru/grukservices.h"
  35#include "xpc.h"
  36
  37#if defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
  38struct uv_IO_APIC_route_entry {
  39        __u64   vector          :  8,
  40                delivery_mode   :  3,
  41                dest_mode       :  1,
  42                delivery_status :  1,
  43                polarity        :  1,
  44                __reserved_1    :  1,
  45                trigger         :  1,
  46                mask            :  1,
  47                __reserved_2    : 15,
  48                dest            : 32;
  49};
  50#endif
  51
  52static struct xpc_heartbeat_uv *xpc_heartbeat_uv;
  53
  54#define XPC_ACTIVATE_MSG_SIZE_UV        (1 * GRU_CACHE_LINE_BYTES)
  55#define XPC_ACTIVATE_MQ_SIZE_UV         (4 * XP_MAX_NPARTITIONS_UV * \
  56                                         XPC_ACTIVATE_MSG_SIZE_UV)
  57#define XPC_ACTIVATE_IRQ_NAME           "xpc_activate"
  58
  59#define XPC_NOTIFY_MSG_SIZE_UV          (2 * GRU_CACHE_LINE_BYTES)
  60#define XPC_NOTIFY_MQ_SIZE_UV           (4 * XP_MAX_NPARTITIONS_UV * \
  61                                         XPC_NOTIFY_MSG_SIZE_UV)
  62#define XPC_NOTIFY_IRQ_NAME             "xpc_notify"
  63
  64static int xpc_mq_node = -1;
  65
  66static struct xpc_gru_mq_uv *xpc_activate_mq_uv;
  67static struct xpc_gru_mq_uv *xpc_notify_mq_uv;
  68
  69static int
  70xpc_setup_partitions_uv(void)
  71{
  72        short partid;
  73        struct xpc_partition_uv *part_uv;
  74
  75        for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
  76                part_uv = &xpc_partitions[partid].sn.uv;
  77
  78                mutex_init(&part_uv->cached_activate_gru_mq_desc_mutex);
  79                spin_lock_init(&part_uv->flags_lock);
  80                part_uv->remote_act_state = XPC_P_AS_INACTIVE;
  81        }
  82        return 0;
  83}
  84
  85static void
  86xpc_teardown_partitions_uv(void)
  87{
  88        short partid;
  89        struct xpc_partition_uv *part_uv;
  90        unsigned long irq_flags;
  91
  92        for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
  93                part_uv = &xpc_partitions[partid].sn.uv;
  94
  95                if (part_uv->cached_activate_gru_mq_desc != NULL) {
  96                        mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
  97                        spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
  98                        part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
  99                        spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
 100                        kfree(part_uv->cached_activate_gru_mq_desc);
 101                        part_uv->cached_activate_gru_mq_desc = NULL;
 102                        mutex_unlock(&part_uv->
 103                                     cached_activate_gru_mq_desc_mutex);
 104                }
 105        }
 106}
 107
 108static int
 109xpc_get_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq, int cpu, char *irq_name)
 110{
 111        int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
 112
 113#if defined CONFIG_X86_64
 114        mq->irq = uv_setup_irq(irq_name, cpu, mq->mmr_blade, mq->mmr_offset,
 115                        UV_AFFINITY_CPU);
 116        if (mq->irq < 0)
 117                return mq->irq;
 118
 119        mq->mmr_value = uv_read_global_mmr64(mmr_pnode, mq->mmr_offset);
 120
 121#elif defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
 122        if (strcmp(irq_name, XPC_ACTIVATE_IRQ_NAME) == 0)
 123                mq->irq = SGI_XPC_ACTIVATE;
 124        else if (strcmp(irq_name, XPC_NOTIFY_IRQ_NAME) == 0)
 125                mq->irq = SGI_XPC_NOTIFY;
 126        else
 127                return -EINVAL;
 128
 129        mq->mmr_value = (unsigned long)cpu_physical_id(cpu) << 32 | mq->irq;
 130        uv_write_global_mmr64(mmr_pnode, mq->mmr_offset, mq->mmr_value);
 131#else
 132        #error not a supported configuration
 133#endif
 134
 135        return 0;
 136}
 137
 138static void
 139xpc_release_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq)
 140{
 141#if defined CONFIG_X86_64
 142        uv_teardown_irq(mq->irq);
 143
 144#elif defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
 145        int mmr_pnode;
 146        unsigned long mmr_value;
 147
 148        mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
 149        mmr_value = 1UL << 16;
 150
 151        uv_write_global_mmr64(mmr_pnode, mq->mmr_offset, mmr_value);
 152#else
 153        #error not a supported configuration
 154#endif
 155}
 156
 157static int
 158xpc_gru_mq_watchlist_alloc_uv(struct xpc_gru_mq_uv *mq)
 159{
 160        int ret;
 161
 162#if defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
 163        int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
 164
 165        ret = sn_mq_watchlist_alloc(mmr_pnode, (void *)uv_gpa(mq->address),
 166                                    mq->order, &mq->mmr_offset);
 167        if (ret < 0) {
 168                dev_err(xpc_part, "sn_mq_watchlist_alloc() failed, ret=%d\n",
 169                        ret);
 170                return -EBUSY;
 171        }
 172#elif defined CONFIG_X86_64
 173        ret = uv_bios_mq_watchlist_alloc(uv_gpa(mq->address),
 174                                         mq->order, &mq->mmr_offset);
 175        if (ret < 0) {
 176                dev_err(xpc_part, "uv_bios_mq_watchlist_alloc() failed, "
 177                        "ret=%d\n", ret);
 178                return ret;
 179        }
 180#else
 181        #error not a supported configuration
 182#endif
 183
 184        mq->watchlist_num = ret;
 185        return 0;
 186}
 187
 188static void
 189xpc_gru_mq_watchlist_free_uv(struct xpc_gru_mq_uv *mq)
 190{
 191        int ret;
 192        int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
 193
 194#if defined CONFIG_X86_64
 195        ret = uv_bios_mq_watchlist_free(mmr_pnode, mq->watchlist_num);
 196        BUG_ON(ret != BIOS_STATUS_SUCCESS);
 197#elif defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
 198        ret = sn_mq_watchlist_free(mmr_pnode, mq->watchlist_num);
 199        BUG_ON(ret != SALRET_OK);
 200#else
 201        #error not a supported configuration
 202#endif
 203}
 204
 205static struct xpc_gru_mq_uv *
 206xpc_create_gru_mq_uv(unsigned int mq_size, int cpu, char *irq_name,
 207                     irq_handler_t irq_handler)
 208{
 209        enum xp_retval xp_ret;
 210        int ret;
 211        int nid;
 212        int nasid;
 213        int pg_order;
 214        struct page *page;
 215        struct xpc_gru_mq_uv *mq;
 216        struct uv_IO_APIC_route_entry *mmr_value;
 217
 218        mq = kmalloc(sizeof(struct xpc_gru_mq_uv), GFP_KERNEL);
 219        if (mq == NULL) {
 220                dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
 221                        "a xpc_gru_mq_uv structure\n");
 222                ret = -ENOMEM;
 223                goto out_0;
 224        }
 225
 226        mq->gru_mq_desc = kzalloc(sizeof(struct gru_message_queue_desc),
 227                                  GFP_KERNEL);
 228        if (mq->gru_mq_desc == NULL) {
 229                dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
 230                        "a gru_message_queue_desc structure\n");
 231                ret = -ENOMEM;
 232                goto out_1;
 233        }
 234
 235        pg_order = get_order(mq_size);
 236        mq->order = pg_order + PAGE_SHIFT;
 237        mq_size = 1UL << mq->order;
 238
 239        mq->mmr_blade = uv_cpu_to_blade_id(cpu);
 240
 241        nid = cpu_to_node(cpu);
 242        page = __alloc_pages_node(nid,
 243                                      GFP_KERNEL | __GFP_ZERO | __GFP_THISNODE,
 244                                      pg_order);
 245        if (page == NULL) {
 246                dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to alloc %d "
 247                        "bytes of memory on nid=%d for GRU mq\n", mq_size, nid);
 248                ret = -ENOMEM;
 249                goto out_2;
 250        }
 251        mq->address = page_address(page);
 252
 253        /* enable generation of irq when GRU mq operation occurs to this mq */
 254        ret = xpc_gru_mq_watchlist_alloc_uv(mq);
 255        if (ret != 0)
 256                goto out_3;
 257
 258        ret = xpc_get_gru_mq_irq_uv(mq, cpu, irq_name);
 259        if (ret != 0)
 260                goto out_4;
 261
 262        ret = request_irq(mq->irq, irq_handler, 0, irq_name, NULL);
 263        if (ret != 0) {
 264                dev_err(xpc_part, "request_irq(irq=%d) returned error=%d\n",
 265                        mq->irq, -ret);
 266                goto out_5;
 267        }
 268
 269        nasid = UV_PNODE_TO_NASID(uv_cpu_to_pnode(cpu));
 270
 271        mmr_value = (struct uv_IO_APIC_route_entry *)&mq->mmr_value;
 272        ret = gru_create_message_queue(mq->gru_mq_desc, mq->address, mq_size,
 273                                     nasid, mmr_value->vector, mmr_value->dest);
 274        if (ret != 0) {
 275                dev_err(xpc_part, "gru_create_message_queue() returned "
 276                        "error=%d\n", ret);
 277                ret = -EINVAL;
 278                goto out_6;
 279        }
 280
 281        /* allow other partitions to access this GRU mq */
 282        xp_ret = xp_expand_memprotect(xp_pa(mq->address), mq_size);
 283        if (xp_ret != xpSuccess) {
 284                ret = -EACCES;
 285                goto out_6;
 286        }
 287
 288        return mq;
 289
 290        /* something went wrong */
 291out_6:
 292        free_irq(mq->irq, NULL);
 293out_5:
 294        xpc_release_gru_mq_irq_uv(mq);
 295out_4:
 296        xpc_gru_mq_watchlist_free_uv(mq);
 297out_3:
 298        free_pages((unsigned long)mq->address, pg_order);
 299out_2:
 300        kfree(mq->gru_mq_desc);
 301out_1:
 302        kfree(mq);
 303out_0:
 304        return ERR_PTR(ret);
 305}
 306
 307static void
 308xpc_destroy_gru_mq_uv(struct xpc_gru_mq_uv *mq)
 309{
 310        unsigned int mq_size;
 311        int pg_order;
 312        int ret;
 313
 314        /* disallow other partitions to access GRU mq */
 315        mq_size = 1UL << mq->order;
 316        ret = xp_restrict_memprotect(xp_pa(mq->address), mq_size);
 317        BUG_ON(ret != xpSuccess);
 318
 319        /* unregister irq handler and release mq irq/vector mapping */
 320        free_irq(mq->irq, NULL);
 321        xpc_release_gru_mq_irq_uv(mq);
 322
 323        /* disable generation of irq when GRU mq op occurs to this mq */
 324        xpc_gru_mq_watchlist_free_uv(mq);
 325
 326        pg_order = mq->order - PAGE_SHIFT;
 327        free_pages((unsigned long)mq->address, pg_order);
 328
 329        kfree(mq);
 330}
 331
 332static enum xp_retval
 333xpc_send_gru_msg(struct gru_message_queue_desc *gru_mq_desc, void *msg,
 334                 size_t msg_size)
 335{
 336        enum xp_retval xp_ret;
 337        int ret;
 338
 339        while (1) {
 340                ret = gru_send_message_gpa(gru_mq_desc, msg, msg_size);
 341                if (ret == MQE_OK) {
 342                        xp_ret = xpSuccess;
 343                        break;
 344                }
 345
 346                if (ret == MQE_QUEUE_FULL) {
 347                        dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
 348                                "error=MQE_QUEUE_FULL\n");
 349                        /* !!! handle QLimit reached; delay & try again */
 350                        /* ??? Do we add a limit to the number of retries? */
 351                        (void)msleep_interruptible(10);
 352                } else if (ret == MQE_CONGESTION) {
 353                        dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
 354                                "error=MQE_CONGESTION\n");
 355                        /* !!! handle LB Overflow; simply try again */
 356                        /* ??? Do we add a limit to the number of retries? */
 357                } else {
 358                        /* !!! Currently this is MQE_UNEXPECTED_CB_ERR */
 359                        dev_err(xpc_chan, "gru_send_message_gpa() returned "
 360                                "error=%d\n", ret);
 361                        xp_ret = xpGruSendMqError;
 362                        break;
 363                }
 364        }
 365        return xp_ret;
 366}
 367
 368static void
 369xpc_process_activate_IRQ_rcvd_uv(void)
 370{
 371        unsigned long irq_flags;
 372        short partid;
 373        struct xpc_partition *part;
 374        u8 act_state_req;
 375
 376        DBUG_ON(xpc_activate_IRQ_rcvd == 0);
 377
 378        spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 379        for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
 380                part = &xpc_partitions[partid];
 381
 382                if (part->sn.uv.act_state_req == 0)
 383                        continue;
 384
 385                xpc_activate_IRQ_rcvd--;
 386                BUG_ON(xpc_activate_IRQ_rcvd < 0);
 387
 388                act_state_req = part->sn.uv.act_state_req;
 389                part->sn.uv.act_state_req = 0;
 390                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 391
 392                if (act_state_req == XPC_P_ASR_ACTIVATE_UV) {
 393                        if (part->act_state == XPC_P_AS_INACTIVE)
 394                                xpc_activate_partition(part);
 395                        else if (part->act_state == XPC_P_AS_DEACTIVATING)
 396                                XPC_DEACTIVATE_PARTITION(part, xpReactivating);
 397
 398                } else if (act_state_req == XPC_P_ASR_REACTIVATE_UV) {
 399                        if (part->act_state == XPC_P_AS_INACTIVE)
 400                                xpc_activate_partition(part);
 401                        else
 402                                XPC_DEACTIVATE_PARTITION(part, xpReactivating);
 403
 404                } else if (act_state_req == XPC_P_ASR_DEACTIVATE_UV) {
 405                        XPC_DEACTIVATE_PARTITION(part, part->sn.uv.reason);
 406
 407                } else {
 408                        BUG();
 409                }
 410
 411                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 412                if (xpc_activate_IRQ_rcvd == 0)
 413                        break;
 414        }
 415        spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 416
 417}
 418
 419static void
 420xpc_handle_activate_mq_msg_uv(struct xpc_partition *part,
 421                              struct xpc_activate_mq_msghdr_uv *msg_hdr,
 422                              int part_setup,
 423                              int *wakeup_hb_checker)
 424{
 425        unsigned long irq_flags;
 426        struct xpc_partition_uv *part_uv = &part->sn.uv;
 427        struct xpc_openclose_args *args;
 428
 429        part_uv->remote_act_state = msg_hdr->act_state;
 430
 431        switch (msg_hdr->type) {
 432        case XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV:
 433                /* syncing of remote_act_state was just done above */
 434                break;
 435
 436        case XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV: {
 437                struct xpc_activate_mq_msg_activate_req_uv *msg;
 438
 439                /*
 440                 * ??? Do we deal here with ts_jiffies being different
 441                 * ??? if act_state != XPC_P_AS_INACTIVE instead of
 442                 * ??? below?
 443                 */
 444                msg = container_of(msg_hdr, struct
 445                                   xpc_activate_mq_msg_activate_req_uv, hdr);
 446
 447                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 448                if (part_uv->act_state_req == 0)
 449                        xpc_activate_IRQ_rcvd++;
 450                part_uv->act_state_req = XPC_P_ASR_ACTIVATE_UV;
 451                part->remote_rp_pa = msg->rp_gpa; /* !!! _pa is _gpa */
 452                part->remote_rp_ts_jiffies = msg_hdr->rp_ts_jiffies;
 453                part_uv->heartbeat_gpa = msg->heartbeat_gpa;
 454
 455                if (msg->activate_gru_mq_desc_gpa !=
 456                    part_uv->activate_gru_mq_desc_gpa) {
 457                        spin_lock(&part_uv->flags_lock);
 458                        part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
 459                        spin_unlock(&part_uv->flags_lock);
 460                        part_uv->activate_gru_mq_desc_gpa =
 461                            msg->activate_gru_mq_desc_gpa;
 462                }
 463                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 464
 465                (*wakeup_hb_checker)++;
 466                break;
 467        }
 468        case XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV: {
 469                struct xpc_activate_mq_msg_deactivate_req_uv *msg;
 470
 471                msg = container_of(msg_hdr, struct
 472                                   xpc_activate_mq_msg_deactivate_req_uv, hdr);
 473
 474                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 475                if (part_uv->act_state_req == 0)
 476                        xpc_activate_IRQ_rcvd++;
 477                part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
 478                part_uv->reason = msg->reason;
 479                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 480
 481                (*wakeup_hb_checker)++;
 482                return;
 483        }
 484        case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV: {
 485                struct xpc_activate_mq_msg_chctl_closerequest_uv *msg;
 486
 487                if (!part_setup)
 488                        break;
 489
 490                msg = container_of(msg_hdr, struct
 491                                   xpc_activate_mq_msg_chctl_closerequest_uv,
 492                                   hdr);
 493                args = &part->remote_openclose_args[msg->ch_number];
 494                args->reason = msg->reason;
 495
 496                spin_lock_irqsave(&part->chctl_lock, irq_flags);
 497                part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREQUEST;
 498                spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
 499
 500                xpc_wakeup_channel_mgr(part);
 501                break;
 502        }
 503        case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV: {
 504                struct xpc_activate_mq_msg_chctl_closereply_uv *msg;
 505
 506                if (!part_setup)
 507                        break;
 508
 509                msg = container_of(msg_hdr, struct
 510                                   xpc_activate_mq_msg_chctl_closereply_uv,
 511                                   hdr);
 512
 513                spin_lock_irqsave(&part->chctl_lock, irq_flags);
 514                part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREPLY;
 515                spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
 516
 517                xpc_wakeup_channel_mgr(part);
 518                break;
 519        }
 520        case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV: {
 521                struct xpc_activate_mq_msg_chctl_openrequest_uv *msg;
 522
 523                if (!part_setup)
 524                        break;
 525
 526                msg = container_of(msg_hdr, struct
 527                                   xpc_activate_mq_msg_chctl_openrequest_uv,
 528                                   hdr);
 529                args = &part->remote_openclose_args[msg->ch_number];
 530                args->entry_size = msg->entry_size;
 531                args->local_nentries = msg->local_nentries;
 532
 533                spin_lock_irqsave(&part->chctl_lock, irq_flags);
 534                part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREQUEST;
 535                spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
 536
 537                xpc_wakeup_channel_mgr(part);
 538                break;
 539        }
 540        case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV: {
 541                struct xpc_activate_mq_msg_chctl_openreply_uv *msg;
 542
 543                if (!part_setup)
 544                        break;
 545
 546                msg = container_of(msg_hdr, struct
 547                                   xpc_activate_mq_msg_chctl_openreply_uv, hdr);
 548                args = &part->remote_openclose_args[msg->ch_number];
 549                args->remote_nentries = msg->remote_nentries;
 550                args->local_nentries = msg->local_nentries;
 551                args->local_msgqueue_pa = msg->notify_gru_mq_desc_gpa;
 552
 553                spin_lock_irqsave(&part->chctl_lock, irq_flags);
 554                part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREPLY;
 555                spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
 556
 557                xpc_wakeup_channel_mgr(part);
 558                break;
 559        }
 560        case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV: {
 561                struct xpc_activate_mq_msg_chctl_opencomplete_uv *msg;
 562
 563                if (!part_setup)
 564                        break;
 565
 566                msg = container_of(msg_hdr, struct
 567                                xpc_activate_mq_msg_chctl_opencomplete_uv, hdr);
 568                spin_lock_irqsave(&part->chctl_lock, irq_flags);
 569                part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENCOMPLETE;
 570                spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
 571
 572                xpc_wakeup_channel_mgr(part);
 573        }
 574        case XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV:
 575                spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
 576                part_uv->flags |= XPC_P_ENGAGED_UV;
 577                spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
 578                break;
 579
 580        case XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV:
 581                spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
 582                part_uv->flags &= ~XPC_P_ENGAGED_UV;
 583                spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
 584                break;
 585
 586        default:
 587                dev_err(xpc_part, "received unknown activate_mq msg type=%d "
 588                        "from partition=%d\n", msg_hdr->type, XPC_PARTID(part));
 589
 590                /* get hb checker to deactivate from the remote partition */
 591                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 592                if (part_uv->act_state_req == 0)
 593                        xpc_activate_IRQ_rcvd++;
 594                part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
 595                part_uv->reason = xpBadMsgType;
 596                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 597
 598                (*wakeup_hb_checker)++;
 599                return;
 600        }
 601
 602        if (msg_hdr->rp_ts_jiffies != part->remote_rp_ts_jiffies &&
 603            part->remote_rp_ts_jiffies != 0) {
 604                /*
 605                 * ??? Does what we do here need to be sensitive to
 606                 * ??? act_state or remote_act_state?
 607                 */
 608                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 609                if (part_uv->act_state_req == 0)
 610                        xpc_activate_IRQ_rcvd++;
 611                part_uv->act_state_req = XPC_P_ASR_REACTIVATE_UV;
 612                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 613
 614                (*wakeup_hb_checker)++;
 615        }
 616}
 617
 618static irqreturn_t
 619xpc_handle_activate_IRQ_uv(int irq, void *dev_id)
 620{
 621        struct xpc_activate_mq_msghdr_uv *msg_hdr;
 622        short partid;
 623        struct xpc_partition *part;
 624        int wakeup_hb_checker = 0;
 625        int part_referenced;
 626
 627        while (1) {
 628                msg_hdr = gru_get_next_message(xpc_activate_mq_uv->gru_mq_desc);
 629                if (msg_hdr == NULL)
 630                        break;
 631
 632                partid = msg_hdr->partid;
 633                if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
 634                        dev_err(xpc_part, "xpc_handle_activate_IRQ_uv() "
 635                                "received invalid partid=0x%x in message\n",
 636                                partid);
 637                } else {
 638                        part = &xpc_partitions[partid];
 639
 640                        part_referenced = xpc_part_ref(part);
 641                        xpc_handle_activate_mq_msg_uv(part, msg_hdr,
 642                                                      part_referenced,
 643                                                      &wakeup_hb_checker);
 644                        if (part_referenced)
 645                                xpc_part_deref(part);
 646                }
 647
 648                gru_free_message(xpc_activate_mq_uv->gru_mq_desc, msg_hdr);
 649        }
 650
 651        if (wakeup_hb_checker)
 652                wake_up_interruptible(&xpc_activate_IRQ_wq);
 653
 654        return IRQ_HANDLED;
 655}
 656
 657static enum xp_retval
 658xpc_cache_remote_gru_mq_desc_uv(struct gru_message_queue_desc *gru_mq_desc,
 659                                unsigned long gru_mq_desc_gpa)
 660{
 661        enum xp_retval ret;
 662
 663        ret = xp_remote_memcpy(uv_gpa(gru_mq_desc), gru_mq_desc_gpa,
 664                               sizeof(struct gru_message_queue_desc));
 665        if (ret == xpSuccess)
 666                gru_mq_desc->mq = NULL;
 667
 668        return ret;
 669}
 670
 671static enum xp_retval
 672xpc_send_activate_IRQ_uv(struct xpc_partition *part, void *msg, size_t msg_size,
 673                         int msg_type)
 674{
 675        struct xpc_activate_mq_msghdr_uv *msg_hdr = msg;
 676        struct xpc_partition_uv *part_uv = &part->sn.uv;
 677        struct gru_message_queue_desc *gru_mq_desc;
 678        unsigned long irq_flags;
 679        enum xp_retval ret;
 680
 681        DBUG_ON(msg_size > XPC_ACTIVATE_MSG_SIZE_UV);
 682
 683        msg_hdr->type = msg_type;
 684        msg_hdr->partid = xp_partition_id;
 685        msg_hdr->act_state = part->act_state;
 686        msg_hdr->rp_ts_jiffies = xpc_rsvd_page->ts_jiffies;
 687
 688        mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
 689again:
 690        if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV)) {
 691                gru_mq_desc = part_uv->cached_activate_gru_mq_desc;
 692                if (gru_mq_desc == NULL) {
 693                        gru_mq_desc = kmalloc(sizeof(struct
 694                                              gru_message_queue_desc),
 695                                              GFP_KERNEL);
 696                        if (gru_mq_desc == NULL) {
 697                                ret = xpNoMemory;
 698                                goto done;
 699                        }
 700                        part_uv->cached_activate_gru_mq_desc = gru_mq_desc;
 701                }
 702
 703                ret = xpc_cache_remote_gru_mq_desc_uv(gru_mq_desc,
 704                                                      part_uv->
 705                                                      activate_gru_mq_desc_gpa);
 706                if (ret != xpSuccess)
 707                        goto done;
 708
 709                spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
 710                part_uv->flags |= XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
 711                spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
 712        }
 713
 714        /* ??? Is holding a spin_lock (ch->lock) during this call a bad idea? */
 715        ret = xpc_send_gru_msg(part_uv->cached_activate_gru_mq_desc, msg,
 716                               msg_size);
 717        if (ret != xpSuccess) {
 718                smp_rmb();      /* ensure a fresh copy of part_uv->flags */
 719                if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV))
 720                        goto again;
 721        }
 722done:
 723        mutex_unlock(&part_uv->cached_activate_gru_mq_desc_mutex);
 724        return ret;
 725}
 726
 727static void
 728xpc_send_activate_IRQ_part_uv(struct xpc_partition *part, void *msg,
 729                              size_t msg_size, int msg_type)
 730{
 731        enum xp_retval ret;
 732
 733        ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
 734        if (unlikely(ret != xpSuccess))
 735                XPC_DEACTIVATE_PARTITION(part, ret);
 736}
 737
 738static void
 739xpc_send_activate_IRQ_ch_uv(struct xpc_channel *ch, unsigned long *irq_flags,
 740                         void *msg, size_t msg_size, int msg_type)
 741{
 742        struct xpc_partition *part = &xpc_partitions[ch->partid];
 743        enum xp_retval ret;
 744
 745        ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
 746        if (unlikely(ret != xpSuccess)) {
 747                if (irq_flags != NULL)
 748                        spin_unlock_irqrestore(&ch->lock, *irq_flags);
 749
 750                XPC_DEACTIVATE_PARTITION(part, ret);
 751
 752                if (irq_flags != NULL)
 753                        spin_lock_irqsave(&ch->lock, *irq_flags);
 754        }
 755}
 756
 757static void
 758xpc_send_local_activate_IRQ_uv(struct xpc_partition *part, int act_state_req)
 759{
 760        unsigned long irq_flags;
 761        struct xpc_partition_uv *part_uv = &part->sn.uv;
 762
 763        /*
 764         * !!! Make our side think that the remote partition sent an activate
 765         * !!! mq message our way by doing what the activate IRQ handler would
 766         * !!! do had one really been sent.
 767         */
 768
 769        spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 770        if (part_uv->act_state_req == 0)
 771                xpc_activate_IRQ_rcvd++;
 772        part_uv->act_state_req = act_state_req;
 773        spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
 774
 775        wake_up_interruptible(&xpc_activate_IRQ_wq);
 776}
 777
 778static enum xp_retval
 779xpc_get_partition_rsvd_page_pa_uv(void *buf, u64 *cookie, unsigned long *rp_pa,
 780                                  size_t *len)
 781{
 782        s64 status;
 783        enum xp_retval ret;
 784
 785#if defined CONFIG_X86_64
 786        status = uv_bios_reserved_page_pa((u64)buf, cookie, (u64 *)rp_pa,
 787                                          (u64 *)len);
 788        if (status == BIOS_STATUS_SUCCESS)
 789                ret = xpSuccess;
 790        else if (status == BIOS_STATUS_MORE_PASSES)
 791                ret = xpNeedMoreInfo;
 792        else
 793                ret = xpBiosError;
 794
 795#elif defined CONFIG_IA64_GENERIC || defined CONFIG_IA64_SGI_UV
 796        status = sn_partition_reserved_page_pa((u64)buf, cookie, rp_pa, len);
 797        if (status == SALRET_OK)
 798                ret = xpSuccess;
 799        else if (status == SALRET_MORE_PASSES)
 800                ret = xpNeedMoreInfo;
 801        else
 802                ret = xpSalError;
 803
 804#else
 805        #error not a supported configuration
 806#endif
 807
 808        return ret;
 809}
 810
 811static int
 812xpc_setup_rsvd_page_uv(struct xpc_rsvd_page *rp)
 813{
 814        xpc_heartbeat_uv =
 815            &xpc_partitions[sn_partition_id].sn.uv.cached_heartbeat;
 816        rp->sn.uv.heartbeat_gpa = uv_gpa(xpc_heartbeat_uv);
 817        rp->sn.uv.activate_gru_mq_desc_gpa =
 818            uv_gpa(xpc_activate_mq_uv->gru_mq_desc);
 819        return 0;
 820}
 821
 822static void
 823xpc_allow_hb_uv(short partid)
 824{
 825}
 826
 827static void
 828xpc_disallow_hb_uv(short partid)
 829{
 830}
 831
 832static void
 833xpc_disallow_all_hbs_uv(void)
 834{
 835}
 836
 837static void
 838xpc_increment_heartbeat_uv(void)
 839{
 840        xpc_heartbeat_uv->value++;
 841}
 842
 843static void
 844xpc_offline_heartbeat_uv(void)
 845{
 846        xpc_increment_heartbeat_uv();
 847        xpc_heartbeat_uv->offline = 1;
 848}
 849
 850static void
 851xpc_online_heartbeat_uv(void)
 852{
 853        xpc_increment_heartbeat_uv();
 854        xpc_heartbeat_uv->offline = 0;
 855}
 856
 857static void
 858xpc_heartbeat_init_uv(void)
 859{
 860        xpc_heartbeat_uv->value = 1;
 861        xpc_heartbeat_uv->offline = 0;
 862}
 863
 864static void
 865xpc_heartbeat_exit_uv(void)
 866{
 867        xpc_offline_heartbeat_uv();
 868}
 869
 870static enum xp_retval
 871xpc_get_remote_heartbeat_uv(struct xpc_partition *part)
 872{
 873        struct xpc_partition_uv *part_uv = &part->sn.uv;
 874        enum xp_retval ret;
 875
 876        ret = xp_remote_memcpy(uv_gpa(&part_uv->cached_heartbeat),
 877                               part_uv->heartbeat_gpa,
 878                               sizeof(struct xpc_heartbeat_uv));
 879        if (ret != xpSuccess)
 880                return ret;
 881
 882        if (part_uv->cached_heartbeat.value == part->last_heartbeat &&
 883            !part_uv->cached_heartbeat.offline) {
 884
 885                ret = xpNoHeartbeat;
 886        } else {
 887                part->last_heartbeat = part_uv->cached_heartbeat.value;
 888        }
 889        return ret;
 890}
 891
 892static void
 893xpc_request_partition_activation_uv(struct xpc_rsvd_page *remote_rp,
 894                                    unsigned long remote_rp_gpa, int nasid)
 895{
 896        short partid = remote_rp->SAL_partid;
 897        struct xpc_partition *part = &xpc_partitions[partid];
 898        struct xpc_activate_mq_msg_activate_req_uv msg;
 899
 900        part->remote_rp_pa = remote_rp_gpa; /* !!! _pa here is really _gpa */
 901        part->remote_rp_ts_jiffies = remote_rp->ts_jiffies;
 902        part->sn.uv.heartbeat_gpa = remote_rp->sn.uv.heartbeat_gpa;
 903        part->sn.uv.activate_gru_mq_desc_gpa =
 904            remote_rp->sn.uv.activate_gru_mq_desc_gpa;
 905
 906        /*
 907         * ??? Is it a good idea to make this conditional on what is
 908         * ??? potentially stale state information?
 909         */
 910        if (part->sn.uv.remote_act_state == XPC_P_AS_INACTIVE) {
 911                msg.rp_gpa = uv_gpa(xpc_rsvd_page);
 912                msg.heartbeat_gpa = xpc_rsvd_page->sn.uv.heartbeat_gpa;
 913                msg.activate_gru_mq_desc_gpa =
 914                    xpc_rsvd_page->sn.uv.activate_gru_mq_desc_gpa;
 915                xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
 916                                           XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV);
 917        }
 918
 919        if (part->act_state == XPC_P_AS_INACTIVE)
 920                xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
 921}
 922
 923static void
 924xpc_request_partition_reactivation_uv(struct xpc_partition *part)
 925{
 926        xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
 927}
 928
 929static void
 930xpc_request_partition_deactivation_uv(struct xpc_partition *part)
 931{
 932        struct xpc_activate_mq_msg_deactivate_req_uv msg;
 933
 934        /*
 935         * ??? Is it a good idea to make this conditional on what is
 936         * ??? potentially stale state information?
 937         */
 938        if (part->sn.uv.remote_act_state != XPC_P_AS_DEACTIVATING &&
 939            part->sn.uv.remote_act_state != XPC_P_AS_INACTIVE) {
 940
 941                msg.reason = part->reason;
 942                xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
 943                                         XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV);
 944        }
 945}
 946
 947static void
 948xpc_cancel_partition_deactivation_request_uv(struct xpc_partition *part)
 949{
 950        /* nothing needs to be done */
 951        return;
 952}
 953
 954static void
 955xpc_init_fifo_uv(struct xpc_fifo_head_uv *head)
 956{
 957        head->first = NULL;
 958        head->last = NULL;
 959        spin_lock_init(&head->lock);
 960        head->n_entries = 0;
 961}
 962
 963static void *
 964xpc_get_fifo_entry_uv(struct xpc_fifo_head_uv *head)
 965{
 966        unsigned long irq_flags;
 967        struct xpc_fifo_entry_uv *first;
 968
 969        spin_lock_irqsave(&head->lock, irq_flags);
 970        first = head->first;
 971        if (head->first != NULL) {
 972                head->first = first->next;
 973                if (head->first == NULL)
 974                        head->last = NULL;
 975
 976                head->n_entries--;
 977                BUG_ON(head->n_entries < 0);
 978
 979                first->next = NULL;
 980        }
 981        spin_unlock_irqrestore(&head->lock, irq_flags);
 982        return first;
 983}
 984
 985static void
 986xpc_put_fifo_entry_uv(struct xpc_fifo_head_uv *head,
 987                      struct xpc_fifo_entry_uv *last)
 988{
 989        unsigned long irq_flags;
 990
 991        last->next = NULL;
 992        spin_lock_irqsave(&head->lock, irq_flags);
 993        if (head->last != NULL)
 994                head->last->next = last;
 995        else
 996                head->first = last;
 997        head->last = last;
 998        head->n_entries++;
 999        spin_unlock_irqrestore(&head->lock, irq_flags);
1000}
1001
1002static int
1003xpc_n_of_fifo_entries_uv(struct xpc_fifo_head_uv *head)
1004{
1005        return head->n_entries;
1006}
1007
1008/*
1009 * Setup the channel structures that are uv specific.
1010 */
1011static enum xp_retval
1012xpc_setup_ch_structures_uv(struct xpc_partition *part)
1013{
1014        struct xpc_channel_uv *ch_uv;
1015        int ch_number;
1016
1017        for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1018                ch_uv = &part->channels[ch_number].sn.uv;
1019
1020                xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
1021                xpc_init_fifo_uv(&ch_uv->recv_msg_list);
1022        }
1023
1024        return xpSuccess;
1025}
1026
1027/*
1028 * Teardown the channel structures that are uv specific.
1029 */
1030static void
1031xpc_teardown_ch_structures_uv(struct xpc_partition *part)
1032{
1033        /* nothing needs to be done */
1034        return;
1035}
1036
1037static enum xp_retval
1038xpc_make_first_contact_uv(struct xpc_partition *part)
1039{
1040        struct xpc_activate_mq_msg_uv msg;
1041
1042        /*
1043         * We send a sync msg to get the remote partition's remote_act_state
1044         * updated to our current act_state which at this point should
1045         * be XPC_P_AS_ACTIVATING.
1046         */
1047        xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
1048                                      XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV);
1049
1050        while (!((part->sn.uv.remote_act_state == XPC_P_AS_ACTIVATING) ||
1051                 (part->sn.uv.remote_act_state == XPC_P_AS_ACTIVE))) {
1052
1053                dev_dbg(xpc_part, "waiting to make first contact with "
1054                        "partition %d\n", XPC_PARTID(part));
1055
1056                /* wait a 1/4 of a second or so */
1057                (void)msleep_interruptible(250);
1058
1059                if (part->act_state == XPC_P_AS_DEACTIVATING)
1060                        return part->reason;
1061        }
1062
1063        return xpSuccess;
1064}
1065
1066static u64
1067xpc_get_chctl_all_flags_uv(struct xpc_partition *part)
1068{
1069        unsigned long irq_flags;
1070        union xpc_channel_ctl_flags chctl;
1071
1072        spin_lock_irqsave(&part->chctl_lock, irq_flags);
1073        chctl = part->chctl;
1074        if (chctl.all_flags != 0)
1075                part->chctl.all_flags = 0;
1076
1077        spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
1078        return chctl.all_flags;
1079}
1080
1081static enum xp_retval
1082xpc_allocate_send_msg_slot_uv(struct xpc_channel *ch)
1083{
1084        struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1085        struct xpc_send_msg_slot_uv *msg_slot;
1086        unsigned long irq_flags;
1087        int nentries;
1088        int entry;
1089        size_t nbytes;
1090
1091        for (nentries = ch->local_nentries; nentries > 0; nentries--) {
1092                nbytes = nentries * sizeof(struct xpc_send_msg_slot_uv);
1093                ch_uv->send_msg_slots = kzalloc(nbytes, GFP_KERNEL);
1094                if (ch_uv->send_msg_slots == NULL)
1095                        continue;
1096
1097                for (entry = 0; entry < nentries; entry++) {
1098                        msg_slot = &ch_uv->send_msg_slots[entry];
1099
1100                        msg_slot->msg_slot_number = entry;
1101                        xpc_put_fifo_entry_uv(&ch_uv->msg_slot_free_list,
1102                                              &msg_slot->next);
1103                }
1104
1105                spin_lock_irqsave(&ch->lock, irq_flags);
1106                if (nentries < ch->local_nentries)
1107                        ch->local_nentries = nentries;
1108                spin_unlock_irqrestore(&ch->lock, irq_flags);
1109                return xpSuccess;
1110        }
1111
1112        return xpNoMemory;
1113}
1114
1115static enum xp_retval
1116xpc_allocate_recv_msg_slot_uv(struct xpc_channel *ch)
1117{
1118        struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1119        struct xpc_notify_mq_msg_uv *msg_slot;
1120        unsigned long irq_flags;
1121        int nentries;
1122        int entry;
1123        size_t nbytes;
1124
1125        for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
1126                nbytes = nentries * ch->entry_size;
1127                ch_uv->recv_msg_slots = kzalloc(nbytes, GFP_KERNEL);
1128                if (ch_uv->recv_msg_slots == NULL)
1129                        continue;
1130
1131                for (entry = 0; entry < nentries; entry++) {
1132                        msg_slot = ch_uv->recv_msg_slots +
1133                            entry * ch->entry_size;
1134
1135                        msg_slot->hdr.msg_slot_number = entry;
1136                }
1137
1138                spin_lock_irqsave(&ch->lock, irq_flags);
1139                if (nentries < ch->remote_nentries)
1140                        ch->remote_nentries = nentries;
1141                spin_unlock_irqrestore(&ch->lock, irq_flags);
1142                return xpSuccess;
1143        }
1144
1145        return xpNoMemory;
1146}
1147
1148/*
1149 * Allocate msg_slots associated with the channel.
1150 */
1151static enum xp_retval
1152xpc_setup_msg_structures_uv(struct xpc_channel *ch)
1153{
1154        static enum xp_retval ret;
1155        struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1156
1157        DBUG_ON(ch->flags & XPC_C_SETUP);
1158
1159        ch_uv->cached_notify_gru_mq_desc = kmalloc(sizeof(struct
1160                                                   gru_message_queue_desc),
1161                                                   GFP_KERNEL);
1162        if (ch_uv->cached_notify_gru_mq_desc == NULL)
1163                return xpNoMemory;
1164
1165        ret = xpc_allocate_send_msg_slot_uv(ch);
1166        if (ret == xpSuccess) {
1167
1168                ret = xpc_allocate_recv_msg_slot_uv(ch);
1169                if (ret != xpSuccess) {
1170                        kfree(ch_uv->send_msg_slots);
1171                        xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
1172                }
1173        }
1174        return ret;
1175}
1176
1177/*
1178 * Free up msg_slots and clear other stuff that were setup for the specified
1179 * channel.
1180 */
1181static void
1182xpc_teardown_msg_structures_uv(struct xpc_channel *ch)
1183{
1184        struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1185
1186        DBUG_ON(!spin_is_locked(&ch->lock));
1187
1188        kfree(ch_uv->cached_notify_gru_mq_desc);
1189        ch_uv->cached_notify_gru_mq_desc = NULL;
1190
1191        if (ch->flags & XPC_C_SETUP) {
1192                xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
1193                kfree(ch_uv->send_msg_slots);
1194                xpc_init_fifo_uv(&ch_uv->recv_msg_list);
1195                kfree(ch_uv->recv_msg_slots);
1196        }
1197}
1198
1199static void
1200xpc_send_chctl_closerequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1201{
1202        struct xpc_activate_mq_msg_chctl_closerequest_uv msg;
1203
1204        msg.ch_number = ch->number;
1205        msg.reason = ch->reason;
1206        xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1207                                    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV);
1208}
1209
1210static void
1211xpc_send_chctl_closereply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1212{
1213        struct xpc_activate_mq_msg_chctl_closereply_uv msg;
1214
1215        msg.ch_number = ch->number;
1216        xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1217                                    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV);
1218}
1219
1220static void
1221xpc_send_chctl_openrequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1222{
1223        struct xpc_activate_mq_msg_chctl_openrequest_uv msg;
1224
1225        msg.ch_number = ch->number;
1226        msg.entry_size = ch->entry_size;
1227        msg.local_nentries = ch->local_nentries;
1228        xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1229                                    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV);
1230}
1231
1232static void
1233xpc_send_chctl_openreply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1234{
1235        struct xpc_activate_mq_msg_chctl_openreply_uv msg;
1236
1237        msg.ch_number = ch->number;
1238        msg.local_nentries = ch->local_nentries;
1239        msg.remote_nentries = ch->remote_nentries;
1240        msg.notify_gru_mq_desc_gpa = uv_gpa(xpc_notify_mq_uv->gru_mq_desc);
1241        xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1242                                    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV);
1243}
1244
1245static void
1246xpc_send_chctl_opencomplete_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1247{
1248        struct xpc_activate_mq_msg_chctl_opencomplete_uv msg;
1249
1250        msg.ch_number = ch->number;
1251        xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1252                                    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV);
1253}
1254
1255static void
1256xpc_send_chctl_local_msgrequest_uv(struct xpc_partition *part, int ch_number)
1257{
1258        unsigned long irq_flags;
1259
1260        spin_lock_irqsave(&part->chctl_lock, irq_flags);
1261        part->chctl.flags[ch_number] |= XPC_CHCTL_MSGREQUEST;
1262        spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
1263
1264        xpc_wakeup_channel_mgr(part);
1265}
1266
1267static enum xp_retval
1268xpc_save_remote_msgqueue_pa_uv(struct xpc_channel *ch,
1269                               unsigned long gru_mq_desc_gpa)
1270{
1271        struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1272
1273        DBUG_ON(ch_uv->cached_notify_gru_mq_desc == NULL);
1274        return xpc_cache_remote_gru_mq_desc_uv(ch_uv->cached_notify_gru_mq_desc,
1275                                               gru_mq_desc_gpa);
1276}
1277
1278static void
1279xpc_indicate_partition_engaged_uv(struct xpc_partition *part)
1280{
1281        struct xpc_activate_mq_msg_uv msg;
1282
1283        xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
1284                                      XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV);
1285}
1286
1287static void
1288xpc_indicate_partition_disengaged_uv(struct xpc_partition *part)
1289{
1290        struct xpc_activate_mq_msg_uv msg;
1291
1292        xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
1293                                      XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV);
1294}
1295
1296static void
1297xpc_assume_partition_disengaged_uv(short partid)
1298{
1299        struct xpc_partition_uv *part_uv = &xpc_partitions[partid].sn.uv;
1300        unsigned long irq_flags;
1301
1302        spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
1303        part_uv->flags &= ~XPC_P_ENGAGED_UV;
1304        spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
1305}
1306
1307static int
1308xpc_partition_engaged_uv(short partid)
1309{
1310        return (xpc_partitions[partid].sn.uv.flags & XPC_P_ENGAGED_UV) != 0;
1311}
1312
1313static int
1314xpc_any_partition_engaged_uv(void)
1315{
1316        struct xpc_partition_uv *part_uv;
1317        short partid;
1318
1319        for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
1320                part_uv = &xpc_partitions[partid].sn.uv;
1321                if ((part_uv->flags & XPC_P_ENGAGED_UV) != 0)
1322                        return 1;
1323        }
1324        return 0;
1325}
1326
1327static enum xp_retval
1328xpc_allocate_msg_slot_uv(struct xpc_channel *ch, u32 flags,
1329                         struct xpc_send_msg_slot_uv **address_of_msg_slot)
1330{
1331        enum xp_retval ret;
1332        struct xpc_send_msg_slot_uv *msg_slot;
1333        struct xpc_fifo_entry_uv *entry;
1334
1335        while (1) {
1336                entry = xpc_get_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list);
1337                if (entry != NULL)
1338                        break;
1339
1340                if (flags & XPC_NOWAIT)
1341                        return xpNoWait;
1342
1343                ret = xpc_allocate_msg_wait(ch);
1344                if (ret != xpInterrupted && ret != xpTimeout)
1345                        return ret;
1346        }
1347
1348        msg_slot = container_of(entry, struct xpc_send_msg_slot_uv, next);
1349        *address_of_msg_slot = msg_slot;
1350        return xpSuccess;
1351}
1352
1353static void
1354xpc_free_msg_slot_uv(struct xpc_channel *ch,
1355                     struct xpc_send_msg_slot_uv *msg_slot)
1356{
1357        xpc_put_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list, &msg_slot->next);
1358
1359        /* wakeup anyone waiting for a free msg slot */
1360        if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1361                wake_up(&ch->msg_allocate_wq);
1362}
1363
1364static void
1365xpc_notify_sender_uv(struct xpc_channel *ch,
1366                     struct xpc_send_msg_slot_uv *msg_slot,
1367                     enum xp_retval reason)
1368{
1369        xpc_notify_func func = msg_slot->func;
1370
1371        if (func != NULL && cmpxchg(&msg_slot->func, func, NULL) == func) {
1372
1373                atomic_dec(&ch->n_to_notify);
1374
1375                dev_dbg(xpc_chan, "msg_slot->func() called, msg_slot=0x%p "
1376                        "msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1377                        msg_slot->msg_slot_number, ch->partid, ch->number);
1378
1379                func(reason, ch->partid, ch->number, msg_slot->key);
1380
1381                dev_dbg(xpc_chan, "msg_slot->func() returned, msg_slot=0x%p "
1382                        "msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1383                        msg_slot->msg_slot_number, ch->partid, ch->number);
1384        }
1385}
1386
1387static void
1388xpc_handle_notify_mq_ack_uv(struct xpc_channel *ch,
1389                            struct xpc_notify_mq_msg_uv *msg)
1390{
1391        struct xpc_send_msg_slot_uv *msg_slot;
1392        int entry = msg->hdr.msg_slot_number % ch->local_nentries;
1393
1394        msg_slot = &ch->sn.uv.send_msg_slots[entry];
1395
1396        BUG_ON(msg_slot->msg_slot_number != msg->hdr.msg_slot_number);
1397        msg_slot->msg_slot_number += ch->local_nentries;
1398
1399        if (msg_slot->func != NULL)
1400                xpc_notify_sender_uv(ch, msg_slot, xpMsgDelivered);
1401
1402        xpc_free_msg_slot_uv(ch, msg_slot);
1403}
1404
1405static void
1406xpc_handle_notify_mq_msg_uv(struct xpc_partition *part,
1407                            struct xpc_notify_mq_msg_uv *msg)
1408{
1409        struct xpc_partition_uv *part_uv = &part->sn.uv;
1410        struct xpc_channel *ch;
1411        struct xpc_channel_uv *ch_uv;
1412        struct xpc_notify_mq_msg_uv *msg_slot;
1413        unsigned long irq_flags;
1414        int ch_number = msg->hdr.ch_number;
1415
1416        if (unlikely(ch_number >= part->nchannels)) {
1417                dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received invalid "
1418                        "channel number=0x%x in message from partid=%d\n",
1419                        ch_number, XPC_PARTID(part));
1420
1421                /* get hb checker to deactivate from the remote partition */
1422                spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1423                if (part_uv->act_state_req == 0)
1424                        xpc_activate_IRQ_rcvd++;
1425                part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
1426                part_uv->reason = xpBadChannelNumber;
1427                spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1428
1429                wake_up_interruptible(&xpc_activate_IRQ_wq);
1430                return;
1431        }
1432
1433        ch = &part->channels[ch_number];
1434        xpc_msgqueue_ref(ch);
1435
1436        if (!(ch->flags & XPC_C_CONNECTED)) {
1437                xpc_msgqueue_deref(ch);
1438                return;
1439        }
1440
1441        /* see if we're really dealing with an ACK for a previously sent msg */
1442        if (msg->hdr.size == 0) {
1443                xpc_handle_notify_mq_ack_uv(ch, msg);
1444                xpc_msgqueue_deref(ch);
1445                return;
1446        }
1447
1448        /* we're dealing with a normal message sent via the notify_mq */
1449        ch_uv = &ch->sn.uv;
1450
1451        msg_slot = ch_uv->recv_msg_slots +
1452            (msg->hdr.msg_slot_number % ch->remote_nentries) * ch->entry_size;
1453
1454        BUG_ON(msg_slot->hdr.size != 0);
1455
1456        memcpy(msg_slot, msg, msg->hdr.size);
1457
1458        xpc_put_fifo_entry_uv(&ch_uv->recv_msg_list, &msg_slot->hdr.u.next);
1459
1460        if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
1461                /*
1462                 * If there is an existing idle kthread get it to deliver
1463                 * the payload, otherwise we'll have to get the channel mgr
1464                 * for this partition to create a kthread to do the delivery.
1465                 */
1466                if (atomic_read(&ch->kthreads_idle) > 0)
1467                        wake_up_nr(&ch->idle_wq, 1);
1468                else
1469                        xpc_send_chctl_local_msgrequest_uv(part, ch->number);
1470        }
1471        xpc_msgqueue_deref(ch);
1472}
1473
1474static irqreturn_t
1475xpc_handle_notify_IRQ_uv(int irq, void *dev_id)
1476{
1477        struct xpc_notify_mq_msg_uv *msg;
1478        short partid;
1479        struct xpc_partition *part;
1480
1481        while ((msg = gru_get_next_message(xpc_notify_mq_uv->gru_mq_desc)) !=
1482               NULL) {
1483
1484                partid = msg->hdr.partid;
1485                if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
1486                        dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received "
1487                                "invalid partid=0x%x in message\n", partid);
1488                } else {
1489                        part = &xpc_partitions[partid];
1490
1491                        if (xpc_part_ref(part)) {
1492                                xpc_handle_notify_mq_msg_uv(part, msg);
1493                                xpc_part_deref(part);
1494                        }
1495                }
1496
1497                gru_free_message(xpc_notify_mq_uv->gru_mq_desc, msg);
1498        }
1499
1500        return IRQ_HANDLED;
1501}
1502
1503static int
1504xpc_n_of_deliverable_payloads_uv(struct xpc_channel *ch)
1505{
1506        return xpc_n_of_fifo_entries_uv(&ch->sn.uv.recv_msg_list);
1507}
1508
1509static void
1510xpc_process_msg_chctl_flags_uv(struct xpc_partition *part, int ch_number)
1511{
1512        struct xpc_channel *ch = &part->channels[ch_number];
1513        int ndeliverable_payloads;
1514
1515        xpc_msgqueue_ref(ch);
1516
1517        ndeliverable_payloads = xpc_n_of_deliverable_payloads_uv(ch);
1518
1519        if (ndeliverable_payloads > 0 &&
1520            (ch->flags & XPC_C_CONNECTED) &&
1521            (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)) {
1522
1523                xpc_activate_kthreads(ch, ndeliverable_payloads);
1524        }
1525
1526        xpc_msgqueue_deref(ch);
1527}
1528
1529static enum xp_retval
1530xpc_send_payload_uv(struct xpc_channel *ch, u32 flags, void *payload,
1531                    u16 payload_size, u8 notify_type, xpc_notify_func func,
1532                    void *key)
1533{
1534        enum xp_retval ret = xpSuccess;
1535        struct xpc_send_msg_slot_uv *msg_slot = NULL;
1536        struct xpc_notify_mq_msg_uv *msg;
1537        u8 msg_buffer[XPC_NOTIFY_MSG_SIZE_UV];
1538        size_t msg_size;
1539
1540        DBUG_ON(notify_type != XPC_N_CALL);
1541
1542        msg_size = sizeof(struct xpc_notify_mq_msghdr_uv) + payload_size;
1543        if (msg_size > ch->entry_size)
1544                return xpPayloadTooBig;
1545
1546        xpc_msgqueue_ref(ch);
1547
1548        if (ch->flags & XPC_C_DISCONNECTING) {
1549                ret = ch->reason;
1550                goto out_1;
1551        }
1552        if (!(ch->flags & XPC_C_CONNECTED)) {
1553                ret = xpNotConnected;
1554                goto out_1;
1555        }
1556
1557        ret = xpc_allocate_msg_slot_uv(ch, flags, &msg_slot);
1558        if (ret != xpSuccess)
1559                goto out_1;
1560
1561        if (func != NULL) {
1562                atomic_inc(&ch->n_to_notify);
1563
1564                msg_slot->key = key;
1565                smp_wmb(); /* a non-NULL func must hit memory after the key */
1566                msg_slot->func = func;
1567
1568                if (ch->flags & XPC_C_DISCONNECTING) {
1569                        ret = ch->reason;
1570                        goto out_2;
1571                }
1572        }
1573
1574        msg = (struct xpc_notify_mq_msg_uv *)&msg_buffer;
1575        msg->hdr.partid = xp_partition_id;
1576        msg->hdr.ch_number = ch->number;
1577        msg->hdr.size = msg_size;
1578        msg->hdr.msg_slot_number = msg_slot->msg_slot_number;
1579        memcpy(&msg->payload, payload, payload_size);
1580
1581        ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
1582                               msg_size);
1583        if (ret == xpSuccess)
1584                goto out_1;
1585
1586        XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1587out_2:
1588        if (func != NULL) {
1589                /*
1590                 * Try to NULL the msg_slot's func field. If we fail, then
1591                 * xpc_notify_senders_of_disconnect_uv() beat us to it, in which
1592                 * case we need to pretend we succeeded to send the message
1593                 * since the user will get a callout for the disconnect error
1594                 * by xpc_notify_senders_of_disconnect_uv(), and to also get an
1595                 * error returned here will confuse them. Additionally, since
1596                 * in this case the channel is being disconnected we don't need
1597                 * to put the the msg_slot back on the free list.
1598                 */
1599                if (cmpxchg(&msg_slot->func, func, NULL) != func) {
1600                        ret = xpSuccess;
1601                        goto out_1;
1602                }
1603
1604                msg_slot->key = NULL;
1605                atomic_dec(&ch->n_to_notify);
1606        }
1607        xpc_free_msg_slot_uv(ch, msg_slot);
1608out_1:
1609        xpc_msgqueue_deref(ch);
1610        return ret;
1611}
1612
1613/*
1614 * Tell the callers of xpc_send_notify() that the status of their payloads
1615 * is unknown because the channel is now disconnecting.
1616 *
1617 * We don't worry about putting these msg_slots on the free list since the
1618 * msg_slots themselves are about to be kfree'd.
1619 */
1620static void
1621xpc_notify_senders_of_disconnect_uv(struct xpc_channel *ch)
1622{
1623        struct xpc_send_msg_slot_uv *msg_slot;
1624        int entry;
1625
1626        DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
1627
1628        for (entry = 0; entry < ch->local_nentries; entry++) {
1629
1630                if (atomic_read(&ch->n_to_notify) == 0)
1631                        break;
1632
1633                msg_slot = &ch->sn.uv.send_msg_slots[entry];
1634                if (msg_slot->func != NULL)
1635                        xpc_notify_sender_uv(ch, msg_slot, ch->reason);
1636        }
1637}
1638
1639/*
1640 * Get the next deliverable message's payload.
1641 */
1642static void *
1643xpc_get_deliverable_payload_uv(struct xpc_channel *ch)
1644{
1645        struct xpc_fifo_entry_uv *entry;
1646        struct xpc_notify_mq_msg_uv *msg;
1647        void *payload = NULL;
1648
1649        if (!(ch->flags & XPC_C_DISCONNECTING)) {
1650                entry = xpc_get_fifo_entry_uv(&ch->sn.uv.recv_msg_list);
1651                if (entry != NULL) {
1652                        msg = container_of(entry, struct xpc_notify_mq_msg_uv,
1653                                           hdr.u.next);
1654                        payload = &msg->payload;
1655                }
1656        }
1657        return payload;
1658}
1659
1660static void
1661xpc_received_payload_uv(struct xpc_channel *ch, void *payload)
1662{
1663        struct xpc_notify_mq_msg_uv *msg;
1664        enum xp_retval ret;
1665
1666        msg = container_of(payload, struct xpc_notify_mq_msg_uv, payload);
1667
1668        /* return an ACK to the sender of this message */
1669
1670        msg->hdr.partid = xp_partition_id;
1671        msg->hdr.size = 0;      /* size of zero indicates this is an ACK */
1672
1673        ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
1674                               sizeof(struct xpc_notify_mq_msghdr_uv));
1675        if (ret != xpSuccess)
1676                XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1677}
1678
1679static struct xpc_arch_operations xpc_arch_ops_uv = {
1680        .setup_partitions = xpc_setup_partitions_uv,
1681        .teardown_partitions = xpc_teardown_partitions_uv,
1682        .process_activate_IRQ_rcvd = xpc_process_activate_IRQ_rcvd_uv,
1683        .get_partition_rsvd_page_pa = xpc_get_partition_rsvd_page_pa_uv,
1684        .setup_rsvd_page = xpc_setup_rsvd_page_uv,
1685
1686        .allow_hb = xpc_allow_hb_uv,
1687        .disallow_hb = xpc_disallow_hb_uv,
1688        .disallow_all_hbs = xpc_disallow_all_hbs_uv,
1689        .increment_heartbeat = xpc_increment_heartbeat_uv,
1690        .offline_heartbeat = xpc_offline_heartbeat_uv,
1691        .online_heartbeat = xpc_online_heartbeat_uv,
1692        .heartbeat_init = xpc_heartbeat_init_uv,
1693        .heartbeat_exit = xpc_heartbeat_exit_uv,
1694        .get_remote_heartbeat = xpc_get_remote_heartbeat_uv,
1695
1696        .request_partition_activation =
1697                xpc_request_partition_activation_uv,
1698        .request_partition_reactivation =
1699                xpc_request_partition_reactivation_uv,
1700        .request_partition_deactivation =
1701                xpc_request_partition_deactivation_uv,
1702        .cancel_partition_deactivation_request =
1703                xpc_cancel_partition_deactivation_request_uv,
1704
1705        .setup_ch_structures = xpc_setup_ch_structures_uv,
1706        .teardown_ch_structures = xpc_teardown_ch_structures_uv,
1707
1708        .make_first_contact = xpc_make_first_contact_uv,
1709
1710        .get_chctl_all_flags = xpc_get_chctl_all_flags_uv,
1711        .send_chctl_closerequest = xpc_send_chctl_closerequest_uv,
1712        .send_chctl_closereply = xpc_send_chctl_closereply_uv,
1713        .send_chctl_openrequest = xpc_send_chctl_openrequest_uv,
1714        .send_chctl_openreply = xpc_send_chctl_openreply_uv,
1715        .send_chctl_opencomplete = xpc_send_chctl_opencomplete_uv,
1716        .process_msg_chctl_flags = xpc_process_msg_chctl_flags_uv,
1717
1718        .save_remote_msgqueue_pa = xpc_save_remote_msgqueue_pa_uv,
1719
1720        .setup_msg_structures = xpc_setup_msg_structures_uv,
1721        .teardown_msg_structures = xpc_teardown_msg_structures_uv,
1722
1723        .indicate_partition_engaged = xpc_indicate_partition_engaged_uv,
1724        .indicate_partition_disengaged = xpc_indicate_partition_disengaged_uv,
1725        .assume_partition_disengaged = xpc_assume_partition_disengaged_uv,
1726        .partition_engaged = xpc_partition_engaged_uv,
1727        .any_partition_engaged = xpc_any_partition_engaged_uv,
1728
1729        .n_of_deliverable_payloads = xpc_n_of_deliverable_payloads_uv,
1730        .send_payload = xpc_send_payload_uv,
1731        .get_deliverable_payload = xpc_get_deliverable_payload_uv,
1732        .received_payload = xpc_received_payload_uv,
1733        .notify_senders_of_disconnect = xpc_notify_senders_of_disconnect_uv,
1734};
1735
1736static int
1737xpc_init_mq_node(int nid)
1738{
1739        int cpu;
1740
1741        get_online_cpus();
1742
1743        for_each_cpu(cpu, cpumask_of_node(nid)) {
1744                xpc_activate_mq_uv =
1745                        xpc_create_gru_mq_uv(XPC_ACTIVATE_MQ_SIZE_UV, nid,
1746                                             XPC_ACTIVATE_IRQ_NAME,
1747                                             xpc_handle_activate_IRQ_uv);
1748                if (!IS_ERR(xpc_activate_mq_uv))
1749                        break;
1750        }
1751        if (IS_ERR(xpc_activate_mq_uv)) {
1752                put_online_cpus();
1753                return PTR_ERR(xpc_activate_mq_uv);
1754        }
1755
1756        for_each_cpu(cpu, cpumask_of_node(nid)) {
1757                xpc_notify_mq_uv =
1758                        xpc_create_gru_mq_uv(XPC_NOTIFY_MQ_SIZE_UV, nid,
1759                                             XPC_NOTIFY_IRQ_NAME,
1760                                             xpc_handle_notify_IRQ_uv);
1761                if (!IS_ERR(xpc_notify_mq_uv))
1762                        break;
1763        }
1764        if (IS_ERR(xpc_notify_mq_uv)) {
1765                xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
1766                put_online_cpus();
1767                return PTR_ERR(xpc_notify_mq_uv);
1768        }
1769
1770        put_online_cpus();
1771        return 0;
1772}
1773
1774int
1775xpc_init_uv(void)
1776{
1777        int nid;
1778        int ret = 0;
1779
1780        xpc_arch_ops = xpc_arch_ops_uv;
1781
1782        if (sizeof(struct xpc_notify_mq_msghdr_uv) > XPC_MSG_HDR_MAX_SIZE) {
1783                dev_err(xpc_part, "xpc_notify_mq_msghdr_uv is larger than %d\n",
1784                        XPC_MSG_HDR_MAX_SIZE);
1785                return -E2BIG;
1786        }
1787
1788        if (xpc_mq_node < 0)
1789                for_each_online_node(nid) {
1790                        ret = xpc_init_mq_node(nid);
1791
1792                        if (!ret)
1793                                break;
1794                }
1795        else
1796                ret = xpc_init_mq_node(xpc_mq_node);
1797
1798        if (ret < 0)
1799                dev_err(xpc_part, "xpc_init_mq_node() returned error=%d\n",
1800                        -ret);
1801
1802        return ret;
1803}
1804
1805void
1806xpc_exit_uv(void)
1807{
1808        xpc_destroy_gru_mq_uv(xpc_notify_mq_uv);
1809        xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
1810}
1811
1812module_param(xpc_mq_node, int, 0);
1813MODULE_PARM_DESC(xpc_mq_node, "Node number on which to allocate message queues.");
1814