linux/drivers/misc/mic/scif/scif_nodeqp.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * Intel MIC Platform Software Stack (MPSS)
   4 *
   5 * Copyright(c) 2014 Intel Corporation.
   6 *
   7 * Intel SCIF driver.
   8 */
   9#include "../bus/scif_bus.h"
  10#include "scif_peer_bus.h"
  11#include "scif_main.h"
  12#include "scif_nodeqp.h"
  13#include "scif_map.h"
  14
  15/*
  16 ************************************************************************
  17 * SCIF node Queue Pair (QP) setup flow:
  18 *
  19 * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
  20 * 2) scif_setup_qp(..) allocates the local qp and calls
  21 *      scif_setup_qp_connect(..) which allocates and maps the local
  22 *      buffer for the inbound QP
  23 * 3) The local node updates the device page with the DMA address of the QP
  24 * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
  25 *      the peer node has updated its QP DMA address
  26 * 5) Once a valid non zero address is found in the QP DMA address field
  27 *      in the device page, the local node maps the remote node's QP,
  28 *      updates its outbound QP and sends a SCIF_INIT message to the peer
  29 * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
  30 *      half handler by calling scif_init(..)
  31 * 7) scif_init(..) registers a new SCIF peer node by calling
  32 *      scif_peer_register_device(..) which signifies the addition of a new
  33 *      SCIF node
  34 * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
  35 *      remote nodes are online via scif_p2p_setup(..)
  36 * 9) For P2P setup, the host maps the remote nodes' aperture and memory
  37 *      bars and sends a SCIF_NODE_ADD message to both nodes
  38 * 10) As part of scif_nodeadd, both nodes set up their local inbound
  39 *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
  40 * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
  41 *      SCIF_NODE_ADD_ACK to the remote nodes
  42 * 12) As part of scif_node_add_ack(..) the remote nodes update their
  43 *      outbound QPs, make sure they can access memory on the remote node
  44 *      and then add a new SCIF peer node by calling
  45 *      scif_peer_register_device(..) which signifies the addition of a new
  46 *      SCIF node.
  47 * 13) The SCIF network is now established across all nodes.
  48 *
  49 ************************************************************************
  50 * SCIF node QP teardown flow (initiated by non mgmt node):
  51 *
  52 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  53 * 2) The device page QP DMA address field is updated with 0x0
  54 * 3) A non mgmt node now cleans up all local data structures and sends a
  55 *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
  56 * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
  57 * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
  58 *      peers and waits for a SCIF_NODE_REMOVE_ACK
  59 * 6) As part of scif_node_remove(..) a remote node unregisters the peer
  60 *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  61 * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  62 *      it sends itself a node remove message whose handling cleans up local
  63 *      data structures and unregisters the peer node from the SCIF network
  64 * 8) The mgmt node sends a SCIF_EXIT_ACK
  65 * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
  66 *      completes the SCIF remove routine
  67 * 10) The SCIF network is now torn down for the node initiating the
  68 *      teardown sequence
  69 *
  70 ************************************************************************
  71 * SCIF node QP teardown flow (initiated by mgmt node):
  72 *
  73 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  74 * 2) The device page QP DMA address field is updated with 0x0
  75 * 3) The mgmt node calls scif_disconnect_node(..)
  76 * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
  77 *      and waits for a SCIF_NODE_REMOVE_ACK
  78 * 5) As part of scif_node_remove(..) a remote node unregisters the peer
  79 *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  80 * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  81 *      it unregisters the peer node from the SCIF network
  82 * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
  83 * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
  84 *      which would clean up local data structures for all SCIF nodes and
  85 *      then send a SCIF_EXIT_ACK back to the mgmt node
  86 * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
  87 *      remove message whose handling cleans up local data structures and
  88 *      destroys any P2P mappings.
  89 * 10) The SCIF hardware device for which a remove callback was received is now
  90 *      disconnected from the SCIF network.
  91 */
  92/*
  93 * Initializes "local" data structures for the QP. Allocates the QP
  94 * ring buffer (rb) and initializes the "in bound" queue.
  95 */
  96int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
  97                          int local_size, struct scif_dev *scifdev)
  98{
  99        void *local_q = qp->inbound_q.rb_base;
 100        int err = 0;
 101        u32 tmp_rd = 0;
 102
 103        spin_lock_init(&qp->send_lock);
 104        spin_lock_init(&qp->recv_lock);
 105
 106        /* Allocate rb only if not already allocated */
 107        if (!local_q) {
 108                local_q = kzalloc(local_size, GFP_KERNEL);
 109                if (!local_q) {
 110                        err = -ENOMEM;
 111                        return err;
 112                }
 113        }
 114
 115        err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 116        if (err)
 117                goto kfree;
 118        /*
 119         * To setup the inbound_q, the buffer lives locally, the read pointer
 120         * is remote and the write pointer is local.
 121         */
 122        scif_rb_init(&qp->inbound_q,
 123                     &tmp_rd,
 124                     &qp->local_write,
 125                     local_q, get_count_order(local_size));
 126        /*
 127         * The read pointer is NULL initially and it is unsafe to use the ring
 128         * buffer til this changes!
 129         */
 130        qp->inbound_q.read_ptr = NULL;
 131        err = scif_map_single(qp_offset, qp,
 132                              scifdev, sizeof(struct scif_qp));
 133        if (err)
 134                goto unmap;
 135        qp->local_qp = *qp_offset;
 136        return err;
 137unmap:
 138        scif_unmap_single(qp->local_buf, scifdev, local_size);
 139        qp->local_buf = 0;
 140kfree:
 141        kfree(local_q);
 142        return err;
 143}
 144
 145/* When the other side has already done it's allocation, this is called */
 146int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
 147                         dma_addr_t phys, int local_size,
 148                         struct scif_dev *scifdev)
 149{
 150        void *local_q;
 151        void *remote_q;
 152        struct scif_qp *remote_qp;
 153        int remote_size;
 154        int err = 0;
 155
 156        spin_lock_init(&qp->send_lock);
 157        spin_lock_init(&qp->recv_lock);
 158        /* Start by figuring out where we need to point */
 159        remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
 160        if (!remote_qp)
 161                return -EIO;
 162        qp->remote_qp = remote_qp;
 163        if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 164                err = -EIO;
 165                goto iounmap;
 166        }
 167        qp->remote_buf = remote_qp->local_buf;
 168        remote_size = qp->remote_qp->inbound_q.size;
 169        remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
 170        if (!remote_q) {
 171                err = -EIO;
 172                goto iounmap;
 173        }
 174        qp->remote_qp->local_write = 0;
 175        /*
 176         * To setup the outbound_q, the buffer lives in remote memory,
 177         * the read pointer is local, the write pointer is remote
 178         */
 179        scif_rb_init(&qp->outbound_q,
 180                     &qp->local_read,
 181                     &qp->remote_qp->local_write,
 182                     remote_q,
 183                     get_count_order(remote_size));
 184        local_q = kzalloc(local_size, GFP_KERNEL);
 185        if (!local_q) {
 186                err = -ENOMEM;
 187                goto iounmap_1;
 188        }
 189        err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 190        if (err)
 191                goto kfree;
 192        qp->remote_qp->local_read = 0;
 193        /*
 194         * To setup the inbound_q, the buffer lives locally, the read pointer
 195         * is remote and the write pointer is local
 196         */
 197        scif_rb_init(&qp->inbound_q,
 198                     &qp->remote_qp->local_read,
 199                     &qp->local_write,
 200                     local_q, get_count_order(local_size));
 201        err = scif_map_single(qp_offset, qp, scifdev,
 202                              sizeof(struct scif_qp));
 203        if (err)
 204                goto unmap;
 205        qp->local_qp = *qp_offset;
 206        return err;
 207unmap:
 208        scif_unmap_single(qp->local_buf, scifdev, local_size);
 209        qp->local_buf = 0;
 210kfree:
 211        kfree(local_q);
 212iounmap_1:
 213        scif_iounmap(remote_q, remote_size, scifdev);
 214        qp->outbound_q.rb_base = NULL;
 215iounmap:
 216        scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
 217        qp->remote_qp = NULL;
 218        return err;
 219}
 220
 221int scif_setup_qp_connect_response(struct scif_dev *scifdev,
 222                                   struct scif_qp *qp, u64 payload)
 223{
 224        int err = 0;
 225        void *r_buf;
 226        int remote_size;
 227        phys_addr_t tmp_phys;
 228
 229        qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
 230
 231        if (!qp->remote_qp) {
 232                err = -ENOMEM;
 233                goto error;
 234        }
 235
 236        if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 237                dev_err(&scifdev->sdev->dev,
 238                        "SCIFEP_MAGIC mismatch between self %d remote %d\n",
 239                        scif_dev[scif_info.nodeid].node, scifdev->node);
 240                err = -ENODEV;
 241                goto error;
 242        }
 243
 244        tmp_phys = qp->remote_qp->local_buf;
 245        remote_size = qp->remote_qp->inbound_q.size;
 246        r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
 247
 248        if (!r_buf)
 249                return -EIO;
 250
 251        qp->local_read = 0;
 252        scif_rb_init(&qp->outbound_q,
 253                     &qp->local_read,
 254                     &qp->remote_qp->local_write,
 255                     r_buf,
 256                     get_count_order(remote_size));
 257        /*
 258         * Because the node QP may already be processing an INIT message, set
 259         * the read pointer so the cached read offset isn't lost
 260         */
 261        qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
 262        /*
 263         * resetup the inbound_q now that we know where the
 264         * inbound_read really is.
 265         */
 266        scif_rb_init(&qp->inbound_q,
 267                     &qp->remote_qp->local_read,
 268                     &qp->local_write,
 269                     qp->inbound_q.rb_base,
 270                     get_count_order(qp->inbound_q.size));
 271error:
 272        return err;
 273}
 274
 275static __always_inline void
 276scif_send_msg_intr(struct scif_dev *scifdev)
 277{
 278        struct scif_hw_dev *sdev = scifdev->sdev;
 279
 280        if (scifdev_is_p2p(scifdev))
 281                sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
 282        else
 283                sdev->hw_ops->send_intr(sdev, scifdev->rdb);
 284}
 285
 286int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
 287{
 288        int err = 0;
 289        struct scifmsg msg;
 290
 291        err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
 292        if (!err) {
 293                /*
 294                 * Now that everything is setup and mapped, we're ready
 295                 * to tell the peer about our queue's location
 296                 */
 297                msg.uop = SCIF_INIT;
 298                msg.dst.node = scifdev->node;
 299                err = scif_nodeqp_send(scifdev, &msg);
 300        }
 301        return err;
 302}
 303
 304void scif_send_exit(struct scif_dev *scifdev)
 305{
 306        struct scifmsg msg;
 307        int ret;
 308
 309        scifdev->exit = OP_IN_PROGRESS;
 310        msg.uop = SCIF_EXIT;
 311        msg.src.node = scif_info.nodeid;
 312        msg.dst.node = scifdev->node;
 313        ret = scif_nodeqp_send(scifdev, &msg);
 314        if (ret)
 315                goto done;
 316        /* Wait for a SCIF_EXIT_ACK message */
 317        wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
 318                           SCIF_NODE_ALIVE_TIMEOUT);
 319done:
 320        scifdev->exit = OP_IDLE;
 321}
 322
 323int scif_setup_qp(struct scif_dev *scifdev)
 324{
 325        int err = 0;
 326        int local_size;
 327        struct scif_qp *qp;
 328
 329        local_size = SCIF_NODE_QP_SIZE;
 330
 331        qp = kzalloc(sizeof(*qp), GFP_KERNEL);
 332        if (!qp) {
 333                err = -ENOMEM;
 334                return err;
 335        }
 336        qp->magic = SCIFEP_MAGIC;
 337        scifdev->qpairs = qp;
 338        err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
 339                                    local_size, scifdev);
 340        if (err)
 341                goto free_qp;
 342        /*
 343         * We're as setup as we can be. The inbound_q is setup, w/o a usable
 344         * outbound q.  When we get a message, the read_ptr will be updated,
 345         * and we will pull the message.
 346         */
 347        return err;
 348free_qp:
 349        kfree(scifdev->qpairs);
 350        scifdev->qpairs = NULL;
 351        return err;
 352}
 353
 354static void scif_p2p_freesg(struct scatterlist *sg)
 355{
 356        kfree(sg);
 357}
 358
 359static struct scatterlist *
 360scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
 361{
 362        struct scatterlist *sg;
 363        struct page *page;
 364        int i;
 365
 366        sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
 367        if (!sg)
 368                return NULL;
 369        sg_init_table(sg, page_cnt);
 370        for (i = 0; i < page_cnt; i++) {
 371                page = pfn_to_page(pa >> PAGE_SHIFT);
 372                sg_set_page(&sg[i], page, page_size, 0);
 373                pa += page_size;
 374        }
 375        return sg;
 376}
 377
 378/* Init p2p mappings required to access peerdev from scifdev */
 379static struct scif_p2p_info *
 380scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
 381{
 382        struct scif_p2p_info *p2p;
 383        int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
 384        struct scif_hw_dev *psdev = peerdev->sdev;
 385        struct scif_hw_dev *sdev = scifdev->sdev;
 386
 387        num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
 388        num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
 389
 390        p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
 391        if (!p2p)
 392                return NULL;
 393        p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
 394                                                    PAGE_SIZE, num_mmio_pages);
 395        if (!p2p->ppi_sg[SCIF_PPI_MMIO])
 396                goto free_p2p;
 397        p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
 398        sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
 399        num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
 400        p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
 401                                                    1 << sg_page_shift,
 402                                                    num_aper_chunks);
 403        p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
 404        err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 405                         num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
 406        if (err != num_mmio_pages)
 407                goto scif_p2p_free;
 408        err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 409                         num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
 410        if (err != num_aper_chunks)
 411                goto dma_unmap;
 412        p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
 413        p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
 414        p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
 415        p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
 416        p2p->ppi_peer_id = peerdev->node;
 417        return p2p;
 418dma_unmap:
 419        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 420                     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 421scif_p2p_free:
 422        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 423        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 424free_p2p:
 425        kfree(p2p);
 426        return NULL;
 427}
 428
 429/* Uninitialize and release resources from a p2p mapping */
 430static void scif_deinit_p2p_info(struct scif_dev *scifdev,
 431                                 struct scif_p2p_info *p2p)
 432{
 433        struct scif_hw_dev *sdev = scifdev->sdev;
 434
 435        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 436                     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 437        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 438                     p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
 439        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 440        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 441        kfree(p2p);
 442}
 443
 444/**
 445 * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
 446 * @scifdev: SCIF device
 447 * @dst: Destination node
 448 *
 449 * Connect the src and dst node by setting up the p2p connection
 450 * between them. Management node here acts like a proxy.
 451 */
 452static void scif_node_connect(struct scif_dev *scifdev, int dst)
 453{
 454        struct scif_dev *dev_j = scifdev;
 455        struct scif_dev *dev_i = NULL;
 456        struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
 457        struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
 458        struct scif_p2p_info *p2p;
 459        struct list_head *pos, *tmp;
 460        struct scifmsg msg;
 461        int err;
 462        u64 tmppayload;
 463
 464        if (dst < 1 || dst > scif_info.maxid)
 465                return;
 466
 467        dev_i = &scif_dev[dst];
 468
 469        if (!_scifdev_alive(dev_i))
 470                return;
 471        /*
 472         * If the p2p connection is already setup or in the process of setting
 473         * up then just ignore this request. The requested node will get
 474         * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
 475         */
 476        if (!list_empty(&dev_i->p2p)) {
 477                list_for_each_safe(pos, tmp, &dev_i->p2p) {
 478                        p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
 479                        if (p2p->ppi_peer_id == dev_j->node)
 480                                return;
 481                }
 482        }
 483        p2p_ij = scif_init_p2p_info(dev_i, dev_j);
 484        if (!p2p_ij)
 485                return;
 486        p2p_ji = scif_init_p2p_info(dev_j, dev_i);
 487        if (!p2p_ji) {
 488                scif_deinit_p2p_info(dev_i, p2p_ij);
 489                return;
 490        }
 491        list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
 492        list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
 493
 494        /*
 495         * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
 496         * as seen from dev_j
 497         */
 498        msg.uop = SCIF_NODE_ADD;
 499        msg.src.node = dev_j->node;
 500        msg.dst.node = dev_i->node;
 501
 502        msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
 503        msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
 504        msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
 505        msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 506
 507        err = scif_nodeqp_send(dev_i,  &msg);
 508        if (err) {
 509                dev_err(&scifdev->sdev->dev,
 510                        "%s %d error %d\n", __func__, __LINE__, err);
 511                return;
 512        }
 513
 514        /* Same as above but to dev_j */
 515        msg.uop = SCIF_NODE_ADD;
 516        msg.src.node = dev_i->node;
 517        msg.dst.node = dev_j->node;
 518
 519        tmppayload = msg.payload[0];
 520        msg.payload[0] = msg.payload[2];
 521        msg.payload[2] = tmppayload;
 522        msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
 523        msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 524
 525        scif_nodeqp_send(dev_j, &msg);
 526}
 527
 528static void scif_p2p_setup(void)
 529{
 530        int i, j;
 531
 532        if (!scif_info.p2p_enable)
 533                return;
 534
 535        for (i = 1; i <= scif_info.maxid; i++)
 536                if (!_scifdev_alive(&scif_dev[i]))
 537                        return;
 538
 539        for (i = 1; i <= scif_info.maxid; i++) {
 540                for (j = 1; j <= scif_info.maxid; j++) {
 541                        struct scif_dev *scifdev = &scif_dev[i];
 542
 543                        if (i == j)
 544                                continue;
 545                        scif_node_connect(scifdev, j);
 546                }
 547        }
 548}
 549
 550static char *message_types[] = {"BAD",
 551                                "INIT",
 552                                "EXIT",
 553                                "SCIF_EXIT_ACK",
 554                                "SCIF_NODE_ADD",
 555                                "SCIF_NODE_ADD_ACK",
 556                                "SCIF_NODE_ADD_NACK",
 557                                "REMOVE_NODE",
 558                                "REMOVE_NODE_ACK",
 559                                "CNCT_REQ",
 560                                "CNCT_GNT",
 561                                "CNCT_GNTACK",
 562                                "CNCT_GNTNACK",
 563                                "CNCT_REJ",
 564                                "DISCNCT",
 565                                "DISCNT_ACK",
 566                                "CLIENT_SENT",
 567                                "CLIENT_RCVD",
 568                                "SCIF_GET_NODE_INFO",
 569                                "REGISTER",
 570                                "REGISTER_ACK",
 571                                "REGISTER_NACK",
 572                                "UNREGISTER",
 573                                "UNREGISTER_ACK",
 574                                "UNREGISTER_NACK",
 575                                "ALLOC_REQ",
 576                                "ALLOC_GNT",
 577                                "ALLOC_REJ",
 578                                "FREE_PHYS",
 579                                "FREE_VIRT",
 580                                "MUNMAP",
 581                                "MARK",
 582                                "MARK_ACK",
 583                                "MARK_NACK",
 584                                "WAIT",
 585                                "WAIT_ACK",
 586                                "WAIT_NACK",
 587                                "SIGNAL_LOCAL",
 588                                "SIGNAL_REMOTE",
 589                                "SIG_ACK",
 590                                "SIG_NACK"};
 591
 592static void
 593scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
 594                     const char *label)
 595{
 596        if (!scif_info.en_msg_log)
 597                return;
 598        if (msg->uop > SCIF_MAX_MSG) {
 599                dev_err(&scifdev->sdev->dev,
 600                        "%s: unknown msg type %d\n", label, msg->uop);
 601                return;
 602        }
 603        dev_info(&scifdev->sdev->dev,
 604                 "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
 605                 label, message_types[msg->uop], msg->src.node, msg->src.port,
 606                 msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
 607                 msg->payload[2], msg->payload[3]);
 608}
 609
 610int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 611{
 612        struct scif_qp *qp = scifdev->qpairs;
 613        int err = -ENOMEM, loop_cnt = 0;
 614
 615        scif_display_message(scifdev, msg, "Sent");
 616        if (!qp) {
 617                err = -EINVAL;
 618                goto error;
 619        }
 620        spin_lock(&qp->send_lock);
 621
 622        while ((err = scif_rb_write(&qp->outbound_q,
 623                                    msg, sizeof(struct scifmsg)))) {
 624                mdelay(1);
 625#define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
 626                if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
 627                        err = -ENODEV;
 628                        break;
 629                }
 630        }
 631        if (!err)
 632                scif_rb_commit(&qp->outbound_q);
 633        spin_unlock(&qp->send_lock);
 634        if (!err) {
 635                if (scifdev_self(scifdev))
 636                        /*
 637                         * For loopback we need to emulate an interrupt by
 638                         * queuing work for the queue handling real node
 639                         * Qp interrupts.
 640                         */
 641                        queue_work(scifdev->intr_wq, &scifdev->intr_bh);
 642                else
 643                        scif_send_msg_intr(scifdev);
 644        }
 645error:
 646        if (err)
 647                dev_dbg(&scifdev->sdev->dev,
 648                        "%s %d error %d uop %d\n",
 649                         __func__, __LINE__, err, msg->uop);
 650        return err;
 651}
 652
 653/**
 654 * scif_nodeqp_send - Send a message on the node queue pair
 655 * @scifdev: Scif Device.
 656 * @msg: The message to be sent.
 657 */
 658int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 659{
 660        int err;
 661        struct device *spdev = NULL;
 662
 663        if (msg->uop > SCIF_EXIT_ACK) {
 664                /* Don't send messages once the exit flow has begun */
 665                if (OP_IDLE != scifdev->exit)
 666                        return -ENODEV;
 667                spdev = scif_get_peer_dev(scifdev);
 668                if (IS_ERR(spdev)) {
 669                        err = PTR_ERR(spdev);
 670                        return err;
 671                }
 672        }
 673        err = _scif_nodeqp_send(scifdev, msg);
 674        if (msg->uop > SCIF_EXIT_ACK)
 675                scif_put_peer_dev(spdev);
 676        return err;
 677}
 678
 679/*
 680 * scif_misc_handler:
 681 *
 682 * Work queue handler for servicing miscellaneous SCIF tasks.
 683 * Examples include:
 684 * 1) Remote fence requests.
 685 * 2) Destruction of temporary registered windows
 686 *    created during scif_vreadfrom()/scif_vwriteto().
 687 * 3) Cleanup of zombie endpoints.
 688 */
 689void scif_misc_handler(struct work_struct *work)
 690{
 691        scif_rma_handle_remote_fences();
 692        scif_rma_destroy_windows();
 693        scif_rma_destroy_tcw_invalid();
 694        scif_cleanup_zombie_epd();
 695}
 696
 697/**
 698 * scif_init() - Respond to SCIF_INIT interrupt message
 699 * @scifdev:    Remote SCIF device node
 700 * @msg:        Interrupt message
 701 */
 702static __always_inline void
 703scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
 704{
 705        /*
 706         * Allow the thread waiting for device page updates for the peer QP DMA
 707         * address to complete initializing the inbound_q.
 708         */
 709        flush_delayed_work(&scifdev->qp_dwork);
 710
 711        scif_peer_register_device(scifdev);
 712
 713        if (scif_is_mgmt_node()) {
 714                mutex_lock(&scif_info.conflock);
 715                scif_p2p_setup();
 716                mutex_unlock(&scif_info.conflock);
 717        }
 718}
 719
 720/**
 721 * scif_exit() - Respond to SCIF_EXIT interrupt message
 722 * @scifdev:    Remote SCIF device node
 723 * @unused:     Interrupt message (unused)
 724 *
 725 * This function stops the SCIF interface for the node which sent
 726 * the SCIF_EXIT message and starts waiting for that node to
 727 * resetup the queue pair again.
 728 */
 729static __always_inline void
 730scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
 731{
 732        scifdev->exit_ack_pending = true;
 733        if (scif_is_mgmt_node())
 734                scif_disconnect_node(scifdev->node, false);
 735        else
 736                scif_stop(scifdev);
 737        schedule_delayed_work(&scifdev->qp_dwork,
 738                              msecs_to_jiffies(1000));
 739}
 740
 741/**
 742 * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
 743 * @scifdev:    Remote SCIF device node
 744 * @unused:     Interrupt message (unused)
 745 *
 746 */
 747static __always_inline void
 748scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
 749{
 750        scifdev->exit = OP_COMPLETED;
 751        wake_up(&scif_info.exitwq);
 752}
 753
 754/**
 755 * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
 756 * @scifdev:    Remote SCIF device node
 757 * @msg:        Interrupt message
 758 *
 759 * When the mgmt node driver has finished initializing a MIC node queue pair it
 760 * marks the node as online. It then looks for all currently online MIC cards
 761 * and send a SCIF_NODE_ADD message to identify the ID of the new card for
 762 * peer to peer initialization
 763 *
 764 * The local node allocates its incoming queue and sends its address in the
 765 * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
 766 * this message to the new node
 767 */
 768static __always_inline void
 769scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
 770{
 771        struct scif_dev *newdev;
 772        dma_addr_t qp_offset;
 773        int qp_connect;
 774        struct scif_hw_dev *sdev;
 775
 776        dev_dbg(&scifdev->sdev->dev,
 777                "Scifdev %d:%d received NODE_ADD msg for node %d\n",
 778                scifdev->node, msg->dst.node, msg->src.node);
 779        dev_dbg(&scifdev->sdev->dev,
 780                "Remote address for this node's aperture %llx\n",
 781                msg->payload[0]);
 782        newdev = &scif_dev[msg->src.node];
 783        newdev->node = msg->src.node;
 784        newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 785        sdev = newdev->sdev;
 786
 787        if (scif_setup_intr_wq(newdev)) {
 788                dev_err(&scifdev->sdev->dev,
 789                        "failed to setup interrupts for %d\n", msg->src.node);
 790                goto interrupt_setup_error;
 791        }
 792        newdev->mmio.va = ioremap(msg->payload[1], sdev->mmio->len);
 793        if (!newdev->mmio.va) {
 794                dev_err(&scifdev->sdev->dev,
 795                        "failed to map mmio for %d\n", msg->src.node);
 796                goto mmio_map_error;
 797        }
 798        newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
 799        if (!newdev->qpairs)
 800                goto qp_alloc_error;
 801        /*
 802         * Set the base address of the remote node's memory since it gets
 803         * added to qp_offset
 804         */
 805        newdev->base_addr = msg->payload[0];
 806
 807        qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
 808                                           SCIF_NODE_QP_SIZE, newdev);
 809        if (qp_connect) {
 810                dev_err(&scifdev->sdev->dev,
 811                        "failed to setup qp_connect %d\n", qp_connect);
 812                goto qp_connect_error;
 813        }
 814
 815        newdev->db = sdev->hw_ops->next_db(sdev);
 816        newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
 817                                                   "SCIF_INTR", newdev,
 818                                                   newdev->db);
 819        if (IS_ERR(newdev->cookie))
 820                goto qp_connect_error;
 821        newdev->qpairs->magic = SCIFEP_MAGIC;
 822        newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
 823
 824        msg->uop = SCIF_NODE_ADD_ACK;
 825        msg->dst.node = msg->src.node;
 826        msg->src.node = scif_info.nodeid;
 827        msg->payload[0] = qp_offset;
 828        msg->payload[2] = newdev->db;
 829        scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 830        return;
 831qp_connect_error:
 832        kfree(newdev->qpairs);
 833        newdev->qpairs = NULL;
 834qp_alloc_error:
 835        iounmap(newdev->mmio.va);
 836        newdev->mmio.va = NULL;
 837mmio_map_error:
 838interrupt_setup_error:
 839        dev_err(&scifdev->sdev->dev,
 840                "node add failed for node %d\n", msg->src.node);
 841        msg->uop = SCIF_NODE_ADD_NACK;
 842        msg->dst.node = msg->src.node;
 843        msg->src.node = scif_info.nodeid;
 844        scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 845}
 846
 847void scif_poll_qp_state(struct work_struct *work)
 848{
 849#define SCIF_NODE_QP_RETRY 100
 850#define SCIF_NODE_QP_TIMEOUT 100
 851        struct scif_dev *peerdev = container_of(work, struct scif_dev,
 852                                                        p2p_dwork.work);
 853        struct scif_qp *qp = &peerdev->qpairs[0];
 854
 855        if (qp->qp_state != SCIF_QP_ONLINE ||
 856            qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
 857                if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
 858                        dev_err(&peerdev->sdev->dev,
 859                                "Warning: QP check timeout with state %d\n",
 860                                qp->qp_state);
 861                        goto timeout;
 862                }
 863                schedule_delayed_work(&peerdev->p2p_dwork,
 864                                      msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
 865                return;
 866        }
 867        return;
 868timeout:
 869        dev_err(&peerdev->sdev->dev,
 870                "%s %d remote node %d offline,  state = 0x%x\n",
 871                __func__, __LINE__, peerdev->node, qp->qp_state);
 872        qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
 873        scif_peer_unregister_device(peerdev);
 874        scif_cleanup_scifdev(peerdev);
 875}
 876
 877/**
 878 * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
 879 * @scifdev:    Remote SCIF device node
 880 * @msg:        Interrupt message
 881 *
 882 * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
 883 * message to the mgmt node to confirm the sequence is finished.
 884 *
 885 */
 886static __always_inline void
 887scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 888{
 889        struct scif_dev *peerdev;
 890        struct scif_qp *qp;
 891        struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 892
 893        dev_dbg(&scifdev->sdev->dev,
 894                "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
 895                scifdev->node, msg->src.node, msg->dst.node);
 896        dev_dbg(&scifdev->sdev->dev,
 897                "payload %llx %llx %llx %llx\n", msg->payload[0],
 898                msg->payload[1], msg->payload[2], msg->payload[3]);
 899        if (scif_is_mgmt_node()) {
 900                /*
 901                 * the lock serializes with scif_qp_response_ack. The mgmt node
 902                 * is forwarding the NODE_ADD_ACK message from src to dst we
 903                 * need to make sure that the dst has already received a
 904                 * NODE_ADD for src and setup its end of the qp to dst
 905                 */
 906                mutex_lock(&scif_info.conflock);
 907                msg->payload[1] = scif_info.maxid;
 908                scif_nodeqp_send(dst_dev, msg);
 909                mutex_unlock(&scif_info.conflock);
 910                return;
 911        }
 912        peerdev = &scif_dev[msg->src.node];
 913        peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 914        peerdev->node = msg->src.node;
 915
 916        qp = &peerdev->qpairs[0];
 917
 918        if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
 919                                            msg->payload[0])))
 920                goto local_error;
 921        peerdev->rdb = msg->payload[2];
 922        qp->remote_qp->qp_state = SCIF_QP_ONLINE;
 923
 924        scif_peer_register_device(peerdev);
 925
 926        schedule_delayed_work(&peerdev->p2p_dwork, 0);
 927        return;
 928local_error:
 929        scif_cleanup_scifdev(peerdev);
 930}
 931
 932/**
 933 * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
 934 * @scifdev:    Remote SCIF device node
 935 * @msg:        Interrupt message
 936 *
 937 * SCIF_NODE_ADD failed, so inform the waiting wq.
 938 */
 939static __always_inline void
 940scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
 941{
 942        if (scif_is_mgmt_node()) {
 943                struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 944
 945                dev_dbg(&scifdev->sdev->dev,
 946                        "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
 947                scif_nodeqp_send(dst_dev, msg);
 948        }
 949}
 950
 951/**
 952 * scif_node_remove: Handle SCIF_NODE_REMOVE message
 953 * @scifdev:    Remote SCIF device node
 954 * @msg: Interrupt message
 955 *
 956 * Handle node removal.
 957 */
 958static __always_inline void
 959scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
 960{
 961        int node = msg->payload[0];
 962        struct scif_dev *scdev = &scif_dev[node];
 963
 964        scdev->node_remove_ack_pending = true;
 965        scif_handle_remove_node(node);
 966}
 967
 968/**
 969 * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
 970 * @scifdev:    Remote SCIF device node
 971 * @msg: Interrupt message
 972 *
 973 * The peer has acked a SCIF_NODE_REMOVE message.
 974 */
 975static __always_inline void
 976scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 977{
 978        struct scif_dev *sdev = &scif_dev[msg->payload[0]];
 979
 980        atomic_inc(&sdev->disconn_rescnt);
 981        wake_up(&sdev->disconn_wq);
 982}
 983
 984/**
 985 * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
 986 * @scifdev:    Remote SCIF device node
 987 * @msg:        Interrupt message
 988 *
 989 * Retrieve node info i.e maxid and total from the mgmt node.
 990 */
 991static __always_inline void
 992scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
 993{
 994        if (scif_is_mgmt_node()) {
 995                swap(msg->dst.node, msg->src.node);
 996                mutex_lock(&scif_info.conflock);
 997                msg->payload[1] = scif_info.maxid;
 998                msg->payload[2] = scif_info.total;
 999                mutex_unlock(&scif_info.conflock);
1000                scif_nodeqp_send(scifdev, msg);
1001        } else {
1002                struct completion *node_info =
1003                        (struct completion *)msg->payload[3];
1004
1005                mutex_lock(&scif_info.conflock);
1006                scif_info.maxid = msg->payload[1];
1007                scif_info.total = msg->payload[2];
1008                complete_all(node_info);
1009                mutex_unlock(&scif_info.conflock);
1010        }
1011}
1012
1013static void
1014scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1015{
1016        /* Bogus Node Qp Message? */
1017        dev_err(&scifdev->sdev->dev,
1018                "Unknown message 0x%xn scifdev->node 0x%x\n",
1019                msg->uop, scifdev->node);
1020}
1021
1022static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1023            (struct scif_dev *, struct scifmsg *msg) = {
1024        scif_msg_unknown,       /* Error */
1025        scif_init,              /* SCIF_INIT */
1026        scif_exit,              /* SCIF_EXIT */
1027        scif_exit_ack,          /* SCIF_EXIT_ACK */
1028        scif_node_add,          /* SCIF_NODE_ADD */
1029        scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1030        scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1031        scif_node_remove,       /* SCIF_NODE_REMOVE */
1032        scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1033        scif_cnctreq,           /* SCIF_CNCT_REQ */
1034        scif_cnctgnt,           /* SCIF_CNCT_GNT */
1035        scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1036        scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1037        scif_cnctrej,           /* SCIF_CNCT_REJ */
1038        scif_discnct,           /* SCIF_DISCNCT */
1039        scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1040        scif_clientsend,        /* SCIF_CLIENT_SENT */
1041        scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1042        scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1043        scif_recv_reg,          /* SCIF_REGISTER */
1044        scif_recv_reg_ack,      /* SCIF_REGISTER_ACK */
1045        scif_recv_reg_nack,     /* SCIF_REGISTER_NACK */
1046        scif_recv_unreg,        /* SCIF_UNREGISTER */
1047        scif_recv_unreg_ack,    /* SCIF_UNREGISTER_ACK */
1048        scif_recv_unreg_nack,   /* SCIF_UNREGISTER_NACK */
1049        scif_alloc_req,         /* SCIF_ALLOC_REQ */
1050        scif_alloc_gnt_rej,     /* SCIF_ALLOC_GNT */
1051        scif_alloc_gnt_rej,     /* SCIF_ALLOC_REJ */
1052        scif_free_virt,         /* SCIF_FREE_VIRT */
1053        scif_recv_munmap,       /* SCIF_MUNMAP */
1054        scif_recv_mark,         /* SCIF_MARK */
1055        scif_recv_mark_resp,    /* SCIF_MARK_ACK */
1056        scif_recv_mark_resp,    /* SCIF_MARK_NACK */
1057        scif_recv_wait,         /* SCIF_WAIT */
1058        scif_recv_wait_resp,    /* SCIF_WAIT_ACK */
1059        scif_recv_wait_resp,    /* SCIF_WAIT_NACK */
1060        scif_recv_sig_local,    /* SCIF_SIG_LOCAL */
1061        scif_recv_sig_remote,   /* SCIF_SIG_REMOTE */
1062        scif_recv_sig_resp,     /* SCIF_SIG_ACK */
1063        scif_recv_sig_resp,     /* SCIF_SIG_NACK */
1064};
1065
1066static int scif_max_msg_id = SCIF_MAX_MSG;
1067/**
1068 * scif_nodeqp_msg_handler() - Common handler for node messages
1069 * @scifdev: Remote device to respond to
1070 * @qp: Remote memory pointer
1071 * @msg: The message to be handled.
1072 *
1073 * This routine calls the appropriate routine to handle a Node Qp
1074 * message receipt
1075 */
1076static void
1077scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1078                        struct scif_qp *qp, struct scifmsg *msg)
1079{
1080        scif_display_message(scifdev, msg, "Rcvd");
1081
1082        if (msg->uop > (u32)scif_max_msg_id) {
1083                /* Bogus Node Qp Message? */
1084                dev_err(&scifdev->sdev->dev,
1085                        "Unknown message 0x%xn scifdev->node 0x%x\n",
1086                        msg->uop, scifdev->node);
1087                return;
1088        }
1089
1090        scif_intr_func[msg->uop](scifdev, msg);
1091}
1092
1093/**
1094 * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1095 * @scifdev:    Remote device to respond to
1096 * @qp:         Remote memory pointer
1097 *
1098 * This routine is triggered by the interrupt mechanism.  It reads
1099 * messages from the node queue RB and calls the Node QP Message handling
1100 * routine.
1101 */
1102void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1103{
1104        struct scifmsg msg;
1105        int read_size;
1106
1107        do {
1108                read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1109                if (!read_size)
1110                        break;
1111                scif_nodeqp_msg_handler(scifdev, qp, &msg);
1112                /*
1113                 * The node queue pair is unmapped so skip the read pointer
1114                 * update after receipt of a SCIF_EXIT_ACK
1115                 */
1116                if (SCIF_EXIT_ACK == msg.uop)
1117                        break;
1118                scif_rb_update_read_ptr(&qp->inbound_q);
1119        } while (1);
1120}
1121
1122/**
1123 * scif_loopb_wq_handler - Loopback Workqueue Handler.
1124 * @unused: loop back work (unused)
1125 *
1126 * This work queue routine is invoked by the loopback work queue handler.
1127 * It grabs the recv lock, dequeues any available messages from the head
1128 * of the loopback message list, calls the node QP message handler,
1129 * waits for it to return, then frees up this message and dequeues more
1130 * elements of the list if available.
1131 */
1132static void scif_loopb_wq_handler(struct work_struct *unused)
1133{
1134        struct scif_dev *scifdev = scif_info.loopb_dev;
1135        struct scif_qp *qp = scifdev->qpairs;
1136        struct scif_loopb_msg *msg;
1137
1138        do {
1139                msg = NULL;
1140                spin_lock(&qp->recv_lock);
1141                if (!list_empty(&scif_info.loopb_recv_q)) {
1142                        msg = list_first_entry(&scif_info.loopb_recv_q,
1143                                               struct scif_loopb_msg,
1144                                               list);
1145                        list_del(&msg->list);
1146                }
1147                spin_unlock(&qp->recv_lock);
1148
1149                if (msg) {
1150                        scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1151                        kfree(msg);
1152                }
1153        } while (msg);
1154}
1155
1156/**
1157 * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1158 * @scifdev: SCIF device
1159 * @qp: Queue pair.
1160 *
1161 * This work queue routine is triggered when a loopback message is received.
1162 *
1163 * We need special handling for receiving Node Qp messages on a loopback SCIF
1164 * device via two workqueues for receiving messages.
1165 *
1166 * The reason we need the extra workqueue which is not required with *normal*
1167 * non-loopback SCIF devices is the potential classic deadlock described below:
1168 *
1169 * Thread A tries to send a message on a loopback SCIF device and blocks since
1170 * there is no space in the RB while it has the send_lock held or another
1171 * lock called lock X for example.
1172 *
1173 * Thread B: The Loopback Node QP message receive workqueue receives the message
1174 * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1175 * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1176 * cannot be drained any further due to this classic deadlock.
1177 *
1178 * In order to avoid deadlocks as mentioned above we have an extra level of
1179 * indirection achieved by having two workqueues.
1180 * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1181 * messages from the Node QP RB, adds them to a list and queues work for the
1182 * second workqueue.
1183 *
1184 * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1185 * messages from the list, handles them, frees up the memory and dequeues
1186 * more elements from the list if possible.
1187 */
1188int
1189scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1190{
1191        int read_size;
1192        struct scif_loopb_msg *msg;
1193
1194        do {
1195                msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1196                if (!msg)
1197                        return -ENOMEM;
1198                read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1199                                             sizeof(struct scifmsg));
1200                if (read_size != sizeof(struct scifmsg)) {
1201                        kfree(msg);
1202                        scif_rb_update_read_ptr(&qp->inbound_q);
1203                        break;
1204                }
1205                spin_lock(&qp->recv_lock);
1206                list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1207                spin_unlock(&qp->recv_lock);
1208                queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1209                scif_rb_update_read_ptr(&qp->inbound_q);
1210        } while (read_size == sizeof(struct scifmsg));
1211        return read_size;
1212}
1213
1214/**
1215 * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1216 * @scifdev: SCIF device
1217 *
1218 * Sets up the required loopback workqueues, queue pairs and ring buffers
1219 */
1220int scif_setup_loopback_qp(struct scif_dev *scifdev)
1221{
1222        int err = 0;
1223        void *local_q;
1224        struct scif_qp *qp;
1225
1226        err = scif_setup_intr_wq(scifdev);
1227        if (err)
1228                goto exit;
1229        INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1230        snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1231                 "SCIF LOOPB %d", scifdev->node);
1232        scif_info.loopb_wq =
1233                alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1234        if (!scif_info.loopb_wq) {
1235                err = -ENOMEM;
1236                goto destroy_intr;
1237        }
1238        INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1239        /* Allocate Self Qpair */
1240        scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1241        if (!scifdev->qpairs) {
1242                err = -ENOMEM;
1243                goto destroy_loopb_wq;
1244        }
1245
1246        qp = scifdev->qpairs;
1247        qp->magic = SCIFEP_MAGIC;
1248        spin_lock_init(&qp->send_lock);
1249        spin_lock_init(&qp->recv_lock);
1250
1251        local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1252        if (!local_q) {
1253                err = -ENOMEM;
1254                goto free_qpairs;
1255        }
1256        /*
1257         * For loopback the inbound_q and outbound_q are essentially the same
1258         * since the Node sends a message on the loopback interface to the
1259         * outbound_q which is then received on the inbound_q.
1260         */
1261        scif_rb_init(&qp->outbound_q,
1262                     &qp->local_read,
1263                     &qp->local_write,
1264                     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1265
1266        scif_rb_init(&qp->inbound_q,
1267                     &qp->local_read,
1268                     &qp->local_write,
1269                     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270        scif_info.nodeid = scifdev->node;
1271
1272        scif_peer_register_device(scifdev);
1273
1274        scif_info.loopb_dev = scifdev;
1275        return err;
1276free_qpairs:
1277        kfree(scifdev->qpairs);
1278destroy_loopb_wq:
1279        destroy_workqueue(scif_info.loopb_wq);
1280destroy_intr:
1281        scif_destroy_intr_wq(scifdev);
1282exit:
1283        return err;
1284}
1285
1286/**
1287 * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1288 * @scifdev: SCIF device
1289 *
1290 * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1291 */
1292int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1293{
1294        scif_peer_unregister_device(scifdev);
1295        destroy_workqueue(scif_info.loopb_wq);
1296        scif_destroy_intr_wq(scifdev);
1297        kfree(scifdev->qpairs->outbound_q.rb_base);
1298        kfree(scifdev->qpairs);
1299        scifdev->sdev = NULL;
1300        scif_info.loopb_dev = NULL;
1301        return 0;
1302}
1303
1304void scif_destroy_p2p(struct scif_dev *scifdev)
1305{
1306        struct scif_dev *peer_dev;
1307        struct scif_p2p_info *p2p;
1308        struct list_head *pos, *tmp;
1309        int bd;
1310
1311        mutex_lock(&scif_info.conflock);
1312        /* Free P2P mappings in the given node for all its peer nodes */
1313        list_for_each_safe(pos, tmp, &scifdev->p2p) {
1314                p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1315                dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1316                             p2p->sg_nentries[SCIF_PPI_MMIO],
1317                             DMA_BIDIRECTIONAL);
1318                dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1319                             p2p->sg_nentries[SCIF_PPI_APER],
1320                             DMA_BIDIRECTIONAL);
1321                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1322                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1323                list_del(pos);
1324                kfree(p2p);
1325        }
1326
1327        /* Free P2P mapping created in the peer nodes for the given node */
1328        for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1329                peer_dev = &scif_dev[bd];
1330                list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1331                        p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1332                        if (p2p->ppi_peer_id == scifdev->node) {
1333                                dma_unmap_sg(&peer_dev->sdev->dev,
1334                                             p2p->ppi_sg[SCIF_PPI_MMIO],
1335                                             p2p->sg_nentries[SCIF_PPI_MMIO],
1336                                             DMA_BIDIRECTIONAL);
1337                                dma_unmap_sg(&peer_dev->sdev->dev,
1338                                             p2p->ppi_sg[SCIF_PPI_APER],
1339                                             p2p->sg_nentries[SCIF_PPI_APER],
1340                                             DMA_BIDIRECTIONAL);
1341                                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1342                                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1343                                list_del(pos);
1344                                kfree(p2p);
1345                        }
1346                }
1347        }
1348        mutex_unlock(&scif_info.conflock);
1349}
1350