linux/drivers/misc/mic/scif/scif_nodeqp.c
<<
>>
Prefs
   1/*
   2 * Intel MIC Platform Software Stack (MPSS)
   3 *
   4 * Copyright(c) 2014 Intel Corporation.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License, version 2, as
   8 * published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13 * General Public License for more details.
  14 *
  15 * Intel SCIF driver.
  16 *
  17 */
  18#include "../bus/scif_bus.h"
  19#include "scif_peer_bus.h"
  20#include "scif_main.h"
  21#include "scif_nodeqp.h"
  22#include "scif_map.h"
  23
  24/*
  25 ************************************************************************
  26 * SCIF node Queue Pair (QP) setup flow:
  27 *
  28 * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
  29 * 2) scif_setup_qp(..) allocates the local qp and calls
  30 *      scif_setup_qp_connect(..) which allocates and maps the local
  31 *      buffer for the inbound QP
  32 * 3) The local node updates the device page with the DMA address of the QP
  33 * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
  34 *      the peer node has updated its QP DMA address
  35 * 5) Once a valid non zero address is found in the QP DMA address field
  36 *      in the device page, the local node maps the remote node's QP,
  37 *      updates its outbound QP and sends a SCIF_INIT message to the peer
  38 * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
  39 *      half handler by calling scif_init(..)
  40 * 7) scif_init(..) registers a new SCIF peer node by calling
  41 *      scif_peer_register_device(..) which signifies the addition of a new
  42 *      SCIF node
  43 * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
  44 *      remote nodes are online via scif_p2p_setup(..)
  45 * 9) For P2P setup, the host maps the remote nodes' aperture and memory
  46 *      bars and sends a SCIF_NODE_ADD message to both nodes
  47 * 10) As part of scif_nodeadd, both nodes set up their local inbound
  48 *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
  49 * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
  50 *      SCIF_NODE_ADD_ACK to the remote nodes
  51 * 12) As part of scif_node_add_ack(..) the remote nodes update their
  52 *      outbound QPs, make sure they can access memory on the remote node
  53 *      and then add a new SCIF peer node by calling
  54 *      scif_peer_register_device(..) which signifies the addition of a new
  55 *      SCIF node.
  56 * 13) The SCIF network is now established across all nodes.
  57 *
  58 ************************************************************************
  59 * SCIF node QP teardown flow (initiated by non mgmt node):
  60 *
  61 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  62 * 2) The device page QP DMA address field is updated with 0x0
  63 * 3) A non mgmt node now cleans up all local data structures and sends a
  64 *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
  65 * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
  66 * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
  67 *      peers and waits for a SCIF_NODE_REMOVE_ACK
  68 * 6) As part of scif_node_remove(..) a remote node unregisters the peer
  69 *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  70 * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  71 *      it sends itself a node remove message whose handling cleans up local
  72 *      data structures and unregisters the peer node from the SCIF network
  73 * 8) The mgmt node sends a SCIF_EXIT_ACK
  74 * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
  75 *      completes the SCIF remove routine
  76 * 10) The SCIF network is now torn down for the node initiating the
  77 *      teardown sequence
  78 *
  79 ************************************************************************
  80 * SCIF node QP teardown flow (initiated by mgmt node):
  81 *
  82 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  83 * 2) The device page QP DMA address field is updated with 0x0
  84 * 3) The mgmt node calls scif_disconnect_node(..)
  85 * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
  86 *      and waits for a SCIF_NODE_REMOVE_ACK
  87 * 5) As part of scif_node_remove(..) a remote node unregisters the peer
  88 *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  89 * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  90 *      it unregisters the peer node from the SCIF network
  91 * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
  92 * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
  93 *      which would clean up local data structures for all SCIF nodes and
  94 *      then send a SCIF_EXIT_ACK back to the mgmt node
  95 * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
  96 *      remove message whose handling cleans up local data structures and
  97 *      destroys any P2P mappings.
  98 * 10) The SCIF hardware device for which a remove callback was received is now
  99 *      disconnected from the SCIF network.
 100 */
 101/*
 102 * Initializes "local" data structures for the QP. Allocates the QP
 103 * ring buffer (rb) and initializes the "in bound" queue.
 104 */
 105int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
 106                          int local_size, struct scif_dev *scifdev)
 107{
 108        void *local_q = qp->inbound_q.rb_base;
 109        int err = 0;
 110        u32 tmp_rd = 0;
 111
 112        spin_lock_init(&qp->send_lock);
 113        spin_lock_init(&qp->recv_lock);
 114
 115        /* Allocate rb only if not already allocated */
 116        if (!local_q) {
 117                local_q = kzalloc(local_size, GFP_KERNEL);
 118                if (!local_q) {
 119                        err = -ENOMEM;
 120                        return err;
 121                }
 122        }
 123
 124        err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 125        if (err)
 126                goto kfree;
 127        /*
 128         * To setup the inbound_q, the buffer lives locally, the read pointer
 129         * is remote and the write pointer is local.
 130         */
 131        scif_rb_init(&qp->inbound_q,
 132                     &tmp_rd,
 133                     &qp->local_write,
 134                     local_q, get_count_order(local_size));
 135        /*
 136         * The read pointer is NULL initially and it is unsafe to use the ring
 137         * buffer til this changes!
 138         */
 139        qp->inbound_q.read_ptr = NULL;
 140        err = scif_map_single(qp_offset, qp,
 141                              scifdev, sizeof(struct scif_qp));
 142        if (err)
 143                goto unmap;
 144        qp->local_qp = *qp_offset;
 145        return err;
 146unmap:
 147        scif_unmap_single(qp->local_buf, scifdev, local_size);
 148        qp->local_buf = 0;
 149kfree:
 150        kfree(local_q);
 151        return err;
 152}
 153
 154/* When the other side has already done it's allocation, this is called */
 155int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
 156                         dma_addr_t phys, int local_size,
 157                         struct scif_dev *scifdev)
 158{
 159        void *local_q;
 160        void *remote_q;
 161        struct scif_qp *remote_qp;
 162        int remote_size;
 163        int err = 0;
 164
 165        spin_lock_init(&qp->send_lock);
 166        spin_lock_init(&qp->recv_lock);
 167        /* Start by figuring out where we need to point */
 168        remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
 169        if (!remote_qp)
 170                return -EIO;
 171        qp->remote_qp = remote_qp;
 172        if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 173                err = -EIO;
 174                goto iounmap;
 175        }
 176        qp->remote_buf = remote_qp->local_buf;
 177        remote_size = qp->remote_qp->inbound_q.size;
 178        remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
 179        if (!remote_q) {
 180                err = -EIO;
 181                goto iounmap;
 182        }
 183        qp->remote_qp->local_write = 0;
 184        /*
 185         * To setup the outbound_q, the buffer lives in remote memory,
 186         * the read pointer is local, the write pointer is remote
 187         */
 188        scif_rb_init(&qp->outbound_q,
 189                     &qp->local_read,
 190                     &qp->remote_qp->local_write,
 191                     remote_q,
 192                     get_count_order(remote_size));
 193        local_q = kzalloc(local_size, GFP_KERNEL);
 194        if (!local_q) {
 195                err = -ENOMEM;
 196                goto iounmap_1;
 197        }
 198        err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 199        if (err)
 200                goto kfree;
 201        qp->remote_qp->local_read = 0;
 202        /*
 203         * To setup the inbound_q, the buffer lives locally, the read pointer
 204         * is remote and the write pointer is local
 205         */
 206        scif_rb_init(&qp->inbound_q,
 207                     &qp->remote_qp->local_read,
 208                     &qp->local_write,
 209                     local_q, get_count_order(local_size));
 210        err = scif_map_single(qp_offset, qp, scifdev,
 211                              sizeof(struct scif_qp));
 212        if (err)
 213                goto unmap;
 214        qp->local_qp = *qp_offset;
 215        return err;
 216unmap:
 217        scif_unmap_single(qp->local_buf, scifdev, local_size);
 218        qp->local_buf = 0;
 219kfree:
 220        kfree(local_q);
 221iounmap_1:
 222        scif_iounmap(remote_q, remote_size, scifdev);
 223        qp->outbound_q.rb_base = NULL;
 224iounmap:
 225        scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
 226        qp->remote_qp = NULL;
 227        return err;
 228}
 229
 230int scif_setup_qp_connect_response(struct scif_dev *scifdev,
 231                                   struct scif_qp *qp, u64 payload)
 232{
 233        int err = 0;
 234        void *r_buf;
 235        int remote_size;
 236        phys_addr_t tmp_phys;
 237
 238        qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
 239
 240        if (!qp->remote_qp) {
 241                err = -ENOMEM;
 242                goto error;
 243        }
 244
 245        if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 246                dev_err(&scifdev->sdev->dev,
 247                        "SCIFEP_MAGIC mismatch between self %d remote %d\n",
 248                        scif_dev[scif_info.nodeid].node, scifdev->node);
 249                err = -ENODEV;
 250                goto error;
 251        }
 252
 253        tmp_phys = qp->remote_qp->local_buf;
 254        remote_size = qp->remote_qp->inbound_q.size;
 255        r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
 256
 257        if (!r_buf)
 258                return -EIO;
 259
 260        qp->local_read = 0;
 261        scif_rb_init(&qp->outbound_q,
 262                     &qp->local_read,
 263                     &qp->remote_qp->local_write,
 264                     r_buf,
 265                     get_count_order(remote_size));
 266        /*
 267         * Because the node QP may already be processing an INIT message, set
 268         * the read pointer so the cached read offset isn't lost
 269         */
 270        qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
 271        /*
 272         * resetup the inbound_q now that we know where the
 273         * inbound_read really is.
 274         */
 275        scif_rb_init(&qp->inbound_q,
 276                     &qp->remote_qp->local_read,
 277                     &qp->local_write,
 278                     qp->inbound_q.rb_base,
 279                     get_count_order(qp->inbound_q.size));
 280error:
 281        return err;
 282}
 283
 284static __always_inline void
 285scif_send_msg_intr(struct scif_dev *scifdev)
 286{
 287        struct scif_hw_dev *sdev = scifdev->sdev;
 288
 289        if (scifdev_is_p2p(scifdev))
 290                sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
 291        else
 292                sdev->hw_ops->send_intr(sdev, scifdev->rdb);
 293}
 294
 295int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
 296{
 297        int err = 0;
 298        struct scifmsg msg;
 299
 300        err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
 301        if (!err) {
 302                /*
 303                 * Now that everything is setup and mapped, we're ready
 304                 * to tell the peer about our queue's location
 305                 */
 306                msg.uop = SCIF_INIT;
 307                msg.dst.node = scifdev->node;
 308                err = scif_nodeqp_send(scifdev, &msg);
 309        }
 310        return err;
 311}
 312
 313void scif_send_exit(struct scif_dev *scifdev)
 314{
 315        struct scifmsg msg;
 316        int ret;
 317
 318        scifdev->exit = OP_IN_PROGRESS;
 319        msg.uop = SCIF_EXIT;
 320        msg.src.node = scif_info.nodeid;
 321        msg.dst.node = scifdev->node;
 322        ret = scif_nodeqp_send(scifdev, &msg);
 323        if (ret)
 324                goto done;
 325        /* Wait for a SCIF_EXIT_ACK message */
 326        wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
 327                           SCIF_NODE_ALIVE_TIMEOUT);
 328done:
 329        scifdev->exit = OP_IDLE;
 330}
 331
 332int scif_setup_qp(struct scif_dev *scifdev)
 333{
 334        int err = 0;
 335        int local_size;
 336        struct scif_qp *qp;
 337
 338        local_size = SCIF_NODE_QP_SIZE;
 339
 340        qp = kzalloc(sizeof(*qp), GFP_KERNEL);
 341        if (!qp) {
 342                err = -ENOMEM;
 343                return err;
 344        }
 345        qp->magic = SCIFEP_MAGIC;
 346        scifdev->qpairs = qp;
 347        err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
 348                                    local_size, scifdev);
 349        if (err)
 350                goto free_qp;
 351        /*
 352         * We're as setup as we can be. The inbound_q is setup, w/o a usable
 353         * outbound q.  When we get a message, the read_ptr will be updated,
 354         * and we will pull the message.
 355         */
 356        return err;
 357free_qp:
 358        kfree(scifdev->qpairs);
 359        scifdev->qpairs = NULL;
 360        return err;
 361}
 362
 363static void scif_p2p_freesg(struct scatterlist *sg)
 364{
 365        kfree(sg);
 366}
 367
 368static struct scatterlist *
 369scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
 370{
 371        struct scatterlist *sg;
 372        struct page *page;
 373        int i;
 374
 375        sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
 376        if (!sg)
 377                return NULL;
 378        sg_init_table(sg, page_cnt);
 379        for (i = 0; i < page_cnt; i++) {
 380                page = pfn_to_page(pa >> PAGE_SHIFT);
 381                sg_set_page(&sg[i], page, page_size, 0);
 382                pa += page_size;
 383        }
 384        return sg;
 385}
 386
 387/* Init p2p mappings required to access peerdev from scifdev */
 388static struct scif_p2p_info *
 389scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
 390{
 391        struct scif_p2p_info *p2p;
 392        int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
 393        struct scif_hw_dev *psdev = peerdev->sdev;
 394        struct scif_hw_dev *sdev = scifdev->sdev;
 395
 396        num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
 397        num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
 398
 399        p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
 400        if (!p2p)
 401                return NULL;
 402        p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
 403                                                    PAGE_SIZE, num_mmio_pages);
 404        if (!p2p->ppi_sg[SCIF_PPI_MMIO])
 405                goto free_p2p;
 406        p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
 407        sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
 408        num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
 409        p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
 410                                                    1 << sg_page_shift,
 411                                                    num_aper_chunks);
 412        p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
 413        err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 414                         num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
 415        if (err != num_mmio_pages)
 416                goto scif_p2p_free;
 417        err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 418                         num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
 419        if (err != num_aper_chunks)
 420                goto dma_unmap;
 421        p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
 422        p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
 423        p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
 424        p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
 425        p2p->ppi_peer_id = peerdev->node;
 426        return p2p;
 427dma_unmap:
 428        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 429                     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 430scif_p2p_free:
 431        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 432        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 433free_p2p:
 434        kfree(p2p);
 435        return NULL;
 436}
 437
 438/* Uninitialize and release resources from a p2p mapping */
 439static void scif_deinit_p2p_info(struct scif_dev *scifdev,
 440                                 struct scif_p2p_info *p2p)
 441{
 442        struct scif_hw_dev *sdev = scifdev->sdev;
 443
 444        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 445                     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 446        dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 447                     p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
 448        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 449        scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 450        kfree(p2p);
 451}
 452
 453/**
 454 * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
 455 * @dst: Destination node
 456 *
 457 * Connect the src and dst node by setting up the p2p connection
 458 * between them. Management node here acts like a proxy.
 459 */
 460static void scif_node_connect(struct scif_dev *scifdev, int dst)
 461{
 462        struct scif_dev *dev_j = scifdev;
 463        struct scif_dev *dev_i = NULL;
 464        struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
 465        struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
 466        struct scif_p2p_info *p2p;
 467        struct list_head *pos, *tmp;
 468        struct scifmsg msg;
 469        int err;
 470        u64 tmppayload;
 471
 472        if (dst < 1 || dst > scif_info.maxid)
 473                return;
 474
 475        dev_i = &scif_dev[dst];
 476
 477        if (!_scifdev_alive(dev_i))
 478                return;
 479        /*
 480         * If the p2p connection is already setup or in the process of setting
 481         * up then just ignore this request. The requested node will get
 482         * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
 483         */
 484        if (!list_empty(&dev_i->p2p)) {
 485                list_for_each_safe(pos, tmp, &dev_i->p2p) {
 486                        p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
 487                        if (p2p->ppi_peer_id == dev_j->node)
 488                                return;
 489                }
 490        }
 491        p2p_ij = scif_init_p2p_info(dev_i, dev_j);
 492        if (!p2p_ij)
 493                return;
 494        p2p_ji = scif_init_p2p_info(dev_j, dev_i);
 495        if (!p2p_ji) {
 496                scif_deinit_p2p_info(dev_i, p2p_ij);
 497                return;
 498        }
 499        list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
 500        list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
 501
 502        /*
 503         * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
 504         * as seen from dev_j
 505         */
 506        msg.uop = SCIF_NODE_ADD;
 507        msg.src.node = dev_j->node;
 508        msg.dst.node = dev_i->node;
 509
 510        msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
 511        msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
 512        msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
 513        msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 514
 515        err = scif_nodeqp_send(dev_i,  &msg);
 516        if (err) {
 517                dev_err(&scifdev->sdev->dev,
 518                        "%s %d error %d\n", __func__, __LINE__, err);
 519                return;
 520        }
 521
 522        /* Same as above but to dev_j */
 523        msg.uop = SCIF_NODE_ADD;
 524        msg.src.node = dev_i->node;
 525        msg.dst.node = dev_j->node;
 526
 527        tmppayload = msg.payload[0];
 528        msg.payload[0] = msg.payload[2];
 529        msg.payload[2] = tmppayload;
 530        msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
 531        msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 532
 533        scif_nodeqp_send(dev_j, &msg);
 534}
 535
 536static void scif_p2p_setup(void)
 537{
 538        int i, j;
 539
 540        if (!scif_info.p2p_enable)
 541                return;
 542
 543        for (i = 1; i <= scif_info.maxid; i++)
 544                if (!_scifdev_alive(&scif_dev[i]))
 545                        return;
 546
 547        for (i = 1; i <= scif_info.maxid; i++) {
 548                for (j = 1; j <= scif_info.maxid; j++) {
 549                        struct scif_dev *scifdev = &scif_dev[i];
 550
 551                        if (i == j)
 552                                continue;
 553                        scif_node_connect(scifdev, j);
 554                }
 555        }
 556}
 557
 558static char *message_types[] = {"BAD",
 559                                "INIT",
 560                                "EXIT",
 561                                "SCIF_EXIT_ACK",
 562                                "SCIF_NODE_ADD",
 563                                "SCIF_NODE_ADD_ACK",
 564                                "SCIF_NODE_ADD_NACK",
 565                                "REMOVE_NODE",
 566                                "REMOVE_NODE_ACK",
 567                                "CNCT_REQ",
 568                                "CNCT_GNT",
 569                                "CNCT_GNTACK",
 570                                "CNCT_GNTNACK",
 571                                "CNCT_REJ",
 572                                "DISCNCT",
 573                                "DISCNT_ACK",
 574                                "CLIENT_SENT",
 575                                "CLIENT_RCVD",
 576                                "SCIF_GET_NODE_INFO",
 577                                "REGISTER",
 578                                "REGISTER_ACK",
 579                                "REGISTER_NACK",
 580                                "UNREGISTER",
 581                                "UNREGISTER_ACK",
 582                                "UNREGISTER_NACK",
 583                                "ALLOC_REQ",
 584                                "ALLOC_GNT",
 585                                "ALLOC_REJ",
 586                                "FREE_PHYS",
 587                                "FREE_VIRT",
 588                                "MUNMAP",
 589                                "MARK",
 590                                "MARK_ACK",
 591                                "MARK_NACK",
 592                                "WAIT",
 593                                "WAIT_ACK",
 594                                "WAIT_NACK",
 595                                "SIGNAL_LOCAL",
 596                                "SIGNAL_REMOTE",
 597                                "SIG_ACK",
 598                                "SIG_NACK"};
 599
 600static void
 601scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
 602                     const char *label)
 603{
 604        if (!scif_info.en_msg_log)
 605                return;
 606        if (msg->uop > SCIF_MAX_MSG) {
 607                dev_err(&scifdev->sdev->dev,
 608                        "%s: unknown msg type %d\n", label, msg->uop);
 609                return;
 610        }
 611        dev_info(&scifdev->sdev->dev,
 612                 "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
 613                 label, message_types[msg->uop], msg->src.node, msg->src.port,
 614                 msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
 615                 msg->payload[2], msg->payload[3]);
 616}
 617
 618int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 619{
 620        struct scif_qp *qp = scifdev->qpairs;
 621        int err = -ENOMEM, loop_cnt = 0;
 622
 623        scif_display_message(scifdev, msg, "Sent");
 624        if (!qp) {
 625                err = -EINVAL;
 626                goto error;
 627        }
 628        spin_lock(&qp->send_lock);
 629
 630        while ((err = scif_rb_write(&qp->outbound_q,
 631                                    msg, sizeof(struct scifmsg)))) {
 632                mdelay(1);
 633#define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
 634                if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
 635                        err = -ENODEV;
 636                        break;
 637                }
 638        }
 639        if (!err)
 640                scif_rb_commit(&qp->outbound_q);
 641        spin_unlock(&qp->send_lock);
 642        if (!err) {
 643                if (scifdev_self(scifdev))
 644                        /*
 645                         * For loopback we need to emulate an interrupt by
 646                         * queuing work for the queue handling real node
 647                         * Qp interrupts.
 648                         */
 649                        queue_work(scifdev->intr_wq, &scifdev->intr_bh);
 650                else
 651                        scif_send_msg_intr(scifdev);
 652        }
 653error:
 654        if (err)
 655                dev_dbg(&scifdev->sdev->dev,
 656                        "%s %d error %d uop %d\n",
 657                         __func__, __LINE__, err, msg->uop);
 658        return err;
 659}
 660
 661/**
 662 * scif_nodeqp_send - Send a message on the node queue pair
 663 * @scifdev: Scif Device.
 664 * @msg: The message to be sent.
 665 */
 666int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 667{
 668        int err;
 669        struct device *spdev = NULL;
 670
 671        if (msg->uop > SCIF_EXIT_ACK) {
 672                /* Dont send messages once the exit flow has begun */
 673                if (OP_IDLE != scifdev->exit)
 674                        return -ENODEV;
 675                spdev = scif_get_peer_dev(scifdev);
 676                if (IS_ERR(spdev)) {
 677                        err = PTR_ERR(spdev);
 678                        return err;
 679                }
 680        }
 681        err = _scif_nodeqp_send(scifdev, msg);
 682        if (msg->uop > SCIF_EXIT_ACK)
 683                scif_put_peer_dev(spdev);
 684        return err;
 685}
 686
 687/*
 688 * scif_misc_handler:
 689 *
 690 * Work queue handler for servicing miscellaneous SCIF tasks.
 691 * Examples include:
 692 * 1) Remote fence requests.
 693 * 2) Destruction of temporary registered windows
 694 *    created during scif_vreadfrom()/scif_vwriteto().
 695 * 3) Cleanup of zombie endpoints.
 696 */
 697void scif_misc_handler(struct work_struct *work)
 698{
 699        scif_rma_handle_remote_fences();
 700        scif_rma_destroy_windows();
 701        scif_rma_destroy_tcw_invalid();
 702        scif_cleanup_zombie_epd();
 703}
 704
 705/**
 706 * scif_init() - Respond to SCIF_INIT interrupt message
 707 * @scifdev:    Remote SCIF device node
 708 * @msg:        Interrupt message
 709 */
 710static __always_inline void
 711scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
 712{
 713        /*
 714         * Allow the thread waiting for device page updates for the peer QP DMA
 715         * address to complete initializing the inbound_q.
 716         */
 717        flush_delayed_work(&scifdev->qp_dwork);
 718
 719        scif_peer_register_device(scifdev);
 720
 721        if (scif_is_mgmt_node()) {
 722                mutex_lock(&scif_info.conflock);
 723                scif_p2p_setup();
 724                mutex_unlock(&scif_info.conflock);
 725        }
 726}
 727
 728/**
 729 * scif_exit() - Respond to SCIF_EXIT interrupt message
 730 * @scifdev:    Remote SCIF device node
 731 * @msg:        Interrupt message
 732 *
 733 * This function stops the SCIF interface for the node which sent
 734 * the SCIF_EXIT message and starts waiting for that node to
 735 * resetup the queue pair again.
 736 */
 737static __always_inline void
 738scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
 739{
 740        scifdev->exit_ack_pending = true;
 741        if (scif_is_mgmt_node())
 742                scif_disconnect_node(scifdev->node, false);
 743        else
 744                scif_stop(scifdev);
 745        schedule_delayed_work(&scifdev->qp_dwork,
 746                              msecs_to_jiffies(1000));
 747}
 748
 749/**
 750 * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
 751 * @scifdev:    Remote SCIF device node
 752 * @msg:        Interrupt message
 753 *
 754 */
 755static __always_inline void
 756scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
 757{
 758        scifdev->exit = OP_COMPLETED;
 759        wake_up(&scif_info.exitwq);
 760}
 761
 762/**
 763 * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
 764 * @scifdev:    Remote SCIF device node
 765 * @msg:        Interrupt message
 766 *
 767 * When the mgmt node driver has finished initializing a MIC node queue pair it
 768 * marks the node as online. It then looks for all currently online MIC cards
 769 * and send a SCIF_NODE_ADD message to identify the ID of the new card for
 770 * peer to peer initialization
 771 *
 772 * The local node allocates its incoming queue and sends its address in the
 773 * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
 774 * this message to the new node
 775 */
 776static __always_inline void
 777scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
 778{
 779        struct scif_dev *newdev;
 780        dma_addr_t qp_offset;
 781        int qp_connect;
 782        struct scif_hw_dev *sdev;
 783
 784        dev_dbg(&scifdev->sdev->dev,
 785                "Scifdev %d:%d received NODE_ADD msg for node %d\n",
 786                scifdev->node, msg->dst.node, msg->src.node);
 787        dev_dbg(&scifdev->sdev->dev,
 788                "Remote address for this node's aperture %llx\n",
 789                msg->payload[0]);
 790        newdev = &scif_dev[msg->src.node];
 791        newdev->node = msg->src.node;
 792        newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 793        sdev = newdev->sdev;
 794
 795        if (scif_setup_intr_wq(newdev)) {
 796                dev_err(&scifdev->sdev->dev,
 797                        "failed to setup interrupts for %d\n", msg->src.node);
 798                goto interrupt_setup_error;
 799        }
 800        newdev->mmio.va = ioremap_nocache(msg->payload[1], sdev->mmio->len);
 801        if (!newdev->mmio.va) {
 802                dev_err(&scifdev->sdev->dev,
 803                        "failed to map mmio for %d\n", msg->src.node);
 804                goto mmio_map_error;
 805        }
 806        newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
 807        if (!newdev->qpairs)
 808                goto qp_alloc_error;
 809        /*
 810         * Set the base address of the remote node's memory since it gets
 811         * added to qp_offset
 812         */
 813        newdev->base_addr = msg->payload[0];
 814
 815        qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
 816                                           SCIF_NODE_QP_SIZE, newdev);
 817        if (qp_connect) {
 818                dev_err(&scifdev->sdev->dev,
 819                        "failed to setup qp_connect %d\n", qp_connect);
 820                goto qp_connect_error;
 821        }
 822
 823        newdev->db = sdev->hw_ops->next_db(sdev);
 824        newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
 825                                                   "SCIF_INTR", newdev,
 826                                                   newdev->db);
 827        if (IS_ERR(newdev->cookie))
 828                goto qp_connect_error;
 829        newdev->qpairs->magic = SCIFEP_MAGIC;
 830        newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
 831
 832        msg->uop = SCIF_NODE_ADD_ACK;
 833        msg->dst.node = msg->src.node;
 834        msg->src.node = scif_info.nodeid;
 835        msg->payload[0] = qp_offset;
 836        msg->payload[2] = newdev->db;
 837        scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 838        return;
 839qp_connect_error:
 840        kfree(newdev->qpairs);
 841        newdev->qpairs = NULL;
 842qp_alloc_error:
 843        iounmap(newdev->mmio.va);
 844        newdev->mmio.va = NULL;
 845mmio_map_error:
 846interrupt_setup_error:
 847        dev_err(&scifdev->sdev->dev,
 848                "node add failed for node %d\n", msg->src.node);
 849        msg->uop = SCIF_NODE_ADD_NACK;
 850        msg->dst.node = msg->src.node;
 851        msg->src.node = scif_info.nodeid;
 852        scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 853}
 854
 855void scif_poll_qp_state(struct work_struct *work)
 856{
 857#define SCIF_NODE_QP_RETRY 100
 858#define SCIF_NODE_QP_TIMEOUT 100
 859        struct scif_dev *peerdev = container_of(work, struct scif_dev,
 860                                                        p2p_dwork.work);
 861        struct scif_qp *qp = &peerdev->qpairs[0];
 862
 863        if (qp->qp_state != SCIF_QP_ONLINE ||
 864            qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
 865                if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
 866                        dev_err(&peerdev->sdev->dev,
 867                                "Warning: QP check timeout with state %d\n",
 868                                qp->qp_state);
 869                        goto timeout;
 870                }
 871                schedule_delayed_work(&peerdev->p2p_dwork,
 872                                      msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
 873                return;
 874        }
 875        return;
 876timeout:
 877        dev_err(&peerdev->sdev->dev,
 878                "%s %d remote node %d offline,  state = 0x%x\n",
 879                __func__, __LINE__, peerdev->node, qp->qp_state);
 880        qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
 881        scif_peer_unregister_device(peerdev);
 882        scif_cleanup_scifdev(peerdev);
 883}
 884
 885/**
 886 * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
 887 * @scifdev:    Remote SCIF device node
 888 * @msg:        Interrupt message
 889 *
 890 * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
 891 * message to the mgmt node to confirm the sequence is finished.
 892 *
 893 */
 894static __always_inline void
 895scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 896{
 897        struct scif_dev *peerdev;
 898        struct scif_qp *qp;
 899        struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 900
 901        dev_dbg(&scifdev->sdev->dev,
 902                "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
 903                scifdev->node, msg->src.node, msg->dst.node);
 904        dev_dbg(&scifdev->sdev->dev,
 905                "payload %llx %llx %llx %llx\n", msg->payload[0],
 906                msg->payload[1], msg->payload[2], msg->payload[3]);
 907        if (scif_is_mgmt_node()) {
 908                /*
 909                 * the lock serializes with scif_qp_response_ack. The mgmt node
 910                 * is forwarding the NODE_ADD_ACK message from src to dst we
 911                 * need to make sure that the dst has already received a
 912                 * NODE_ADD for src and setup its end of the qp to dst
 913                 */
 914                mutex_lock(&scif_info.conflock);
 915                msg->payload[1] = scif_info.maxid;
 916                scif_nodeqp_send(dst_dev, msg);
 917                mutex_unlock(&scif_info.conflock);
 918                return;
 919        }
 920        peerdev = &scif_dev[msg->src.node];
 921        peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 922        peerdev->node = msg->src.node;
 923
 924        qp = &peerdev->qpairs[0];
 925
 926        if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
 927                                            msg->payload[0])))
 928                goto local_error;
 929        peerdev->rdb = msg->payload[2];
 930        qp->remote_qp->qp_state = SCIF_QP_ONLINE;
 931
 932        scif_peer_register_device(peerdev);
 933
 934        schedule_delayed_work(&peerdev->p2p_dwork, 0);
 935        return;
 936local_error:
 937        scif_cleanup_scifdev(peerdev);
 938}
 939
 940/**
 941 * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
 942 * @msg:        Interrupt message
 943 *
 944 * SCIF_NODE_ADD failed, so inform the waiting wq.
 945 */
 946static __always_inline void
 947scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
 948{
 949        if (scif_is_mgmt_node()) {
 950                struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 951
 952                dev_dbg(&scifdev->sdev->dev,
 953                        "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
 954                scif_nodeqp_send(dst_dev, msg);
 955        }
 956}
 957
 958/*
 959 * scif_node_remove: Handle SCIF_NODE_REMOVE message
 960 * @msg: Interrupt message
 961 *
 962 * Handle node removal.
 963 */
 964static __always_inline void
 965scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
 966{
 967        int node = msg->payload[0];
 968        struct scif_dev *scdev = &scif_dev[node];
 969
 970        scdev->node_remove_ack_pending = true;
 971        scif_handle_remove_node(node);
 972}
 973
 974/*
 975 * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
 976 * @msg: Interrupt message
 977 *
 978 * The peer has acked a SCIF_NODE_REMOVE message.
 979 */
 980static __always_inline void
 981scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 982{
 983        struct scif_dev *sdev = &scif_dev[msg->payload[0]];
 984
 985        atomic_inc(&sdev->disconn_rescnt);
 986        wake_up(&sdev->disconn_wq);
 987}
 988
 989/**
 990 * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
 991 * @msg:        Interrupt message
 992 *
 993 * Retrieve node info i.e maxid and total from the mgmt node.
 994 */
 995static __always_inline void
 996scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
 997{
 998        if (scif_is_mgmt_node()) {
 999                swap(msg->dst.node, msg->src.node);
1000                mutex_lock(&scif_info.conflock);
1001                msg->payload[1] = scif_info.maxid;
1002                msg->payload[2] = scif_info.total;
1003                mutex_unlock(&scif_info.conflock);
1004                scif_nodeqp_send(scifdev, msg);
1005        } else {
1006                struct completion *node_info =
1007                        (struct completion *)msg->payload[3];
1008
1009                mutex_lock(&scif_info.conflock);
1010                scif_info.maxid = msg->payload[1];
1011                scif_info.total = msg->payload[2];
1012                complete_all(node_info);
1013                mutex_unlock(&scif_info.conflock);
1014        }
1015}
1016
1017static void
1018scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1019{
1020        /* Bogus Node Qp Message? */
1021        dev_err(&scifdev->sdev->dev,
1022                "Unknown message 0x%xn scifdev->node 0x%x\n",
1023                msg->uop, scifdev->node);
1024}
1025
1026static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1027            (struct scif_dev *, struct scifmsg *msg) = {
1028        scif_msg_unknown,       /* Error */
1029        scif_init,              /* SCIF_INIT */
1030        scif_exit,              /* SCIF_EXIT */
1031        scif_exit_ack,          /* SCIF_EXIT_ACK */
1032        scif_node_add,          /* SCIF_NODE_ADD */
1033        scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1034        scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1035        scif_node_remove,       /* SCIF_NODE_REMOVE */
1036        scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1037        scif_cnctreq,           /* SCIF_CNCT_REQ */
1038        scif_cnctgnt,           /* SCIF_CNCT_GNT */
1039        scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1040        scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1041        scif_cnctrej,           /* SCIF_CNCT_REJ */
1042        scif_discnct,           /* SCIF_DISCNCT */
1043        scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1044        scif_clientsend,        /* SCIF_CLIENT_SENT */
1045        scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1046        scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1047        scif_recv_reg,          /* SCIF_REGISTER */
1048        scif_recv_reg_ack,      /* SCIF_REGISTER_ACK */
1049        scif_recv_reg_nack,     /* SCIF_REGISTER_NACK */
1050        scif_recv_unreg,        /* SCIF_UNREGISTER */
1051        scif_recv_unreg_ack,    /* SCIF_UNREGISTER_ACK */
1052        scif_recv_unreg_nack,   /* SCIF_UNREGISTER_NACK */
1053        scif_alloc_req,         /* SCIF_ALLOC_REQ */
1054        scif_alloc_gnt_rej,     /* SCIF_ALLOC_GNT */
1055        scif_alloc_gnt_rej,     /* SCIF_ALLOC_REJ */
1056        scif_free_virt,         /* SCIF_FREE_VIRT */
1057        scif_recv_munmap,       /* SCIF_MUNMAP */
1058        scif_recv_mark,         /* SCIF_MARK */
1059        scif_recv_mark_resp,    /* SCIF_MARK_ACK */
1060        scif_recv_mark_resp,    /* SCIF_MARK_NACK */
1061        scif_recv_wait,         /* SCIF_WAIT */
1062        scif_recv_wait_resp,    /* SCIF_WAIT_ACK */
1063        scif_recv_wait_resp,    /* SCIF_WAIT_NACK */
1064        scif_recv_sig_local,    /* SCIF_SIG_LOCAL */
1065        scif_recv_sig_remote,   /* SCIF_SIG_REMOTE */
1066        scif_recv_sig_resp,     /* SCIF_SIG_ACK */
1067        scif_recv_sig_resp,     /* SCIF_SIG_NACK */
1068};
1069
1070/**
1071 * scif_nodeqp_msg_handler() - Common handler for node messages
1072 * @scifdev: Remote device to respond to
1073 * @qp: Remote memory pointer
1074 * @msg: The message to be handled.
1075 *
1076 * This routine calls the appropriate routine to handle a Node Qp
1077 * message receipt
1078 */
1079static int scif_max_msg_id = SCIF_MAX_MSG;
1080
1081static void
1082scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1083                        struct scif_qp *qp, struct scifmsg *msg)
1084{
1085        scif_display_message(scifdev, msg, "Rcvd");
1086
1087        if (msg->uop > (u32)scif_max_msg_id) {
1088                /* Bogus Node Qp Message? */
1089                dev_err(&scifdev->sdev->dev,
1090                        "Unknown message 0x%xn scifdev->node 0x%x\n",
1091                        msg->uop, scifdev->node);
1092                return;
1093        }
1094
1095        scif_intr_func[msg->uop](scifdev, msg);
1096}
1097
1098/**
1099 * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1100 * @scifdev:    Remote device to respond to
1101 * @qp:         Remote memory pointer
1102 *
1103 * This routine is triggered by the interrupt mechanism.  It reads
1104 * messages from the node queue RB and calls the Node QP Message handling
1105 * routine.
1106 */
1107void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1108{
1109        struct scifmsg msg;
1110        int read_size;
1111
1112        do {
1113                read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1114                if (!read_size)
1115                        break;
1116                scif_nodeqp_msg_handler(scifdev, qp, &msg);
1117                /*
1118                 * The node queue pair is unmapped so skip the read pointer
1119                 * update after receipt of a SCIF_EXIT_ACK
1120                 */
1121                if (SCIF_EXIT_ACK == msg.uop)
1122                        break;
1123                scif_rb_update_read_ptr(&qp->inbound_q);
1124        } while (1);
1125}
1126
1127/**
1128 * scif_loopb_wq_handler - Loopback Workqueue Handler.
1129 * @work: loop back work
1130 *
1131 * This work queue routine is invoked by the loopback work queue handler.
1132 * It grabs the recv lock, dequeues any available messages from the head
1133 * of the loopback message list, calls the node QP message handler,
1134 * waits for it to return, then frees up this message and dequeues more
1135 * elements of the list if available.
1136 */
1137static void scif_loopb_wq_handler(struct work_struct *unused)
1138{
1139        struct scif_dev *scifdev = scif_info.loopb_dev;
1140        struct scif_qp *qp = scifdev->qpairs;
1141        struct scif_loopb_msg *msg;
1142
1143        do {
1144                msg = NULL;
1145                spin_lock(&qp->recv_lock);
1146                if (!list_empty(&scif_info.loopb_recv_q)) {
1147                        msg = list_first_entry(&scif_info.loopb_recv_q,
1148                                               struct scif_loopb_msg,
1149                                               list);
1150                        list_del(&msg->list);
1151                }
1152                spin_unlock(&qp->recv_lock);
1153
1154                if (msg) {
1155                        scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1156                        kfree(msg);
1157                }
1158        } while (msg);
1159}
1160
1161/**
1162 * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1163 * @scifdev: SCIF device
1164 * @qp: Queue pair.
1165 *
1166 * This work queue routine is triggered when a loopback message is received.
1167 *
1168 * We need special handling for receiving Node Qp messages on a loopback SCIF
1169 * device via two workqueues for receiving messages.
1170 *
1171 * The reason we need the extra workqueue which is not required with *normal*
1172 * non-loopback SCIF devices is the potential classic deadlock described below:
1173 *
1174 * Thread A tries to send a message on a loopback SCIF device and blocks since
1175 * there is no space in the RB while it has the send_lock held or another
1176 * lock called lock X for example.
1177 *
1178 * Thread B: The Loopback Node QP message receive workqueue receives the message
1179 * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1180 * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1181 * cannot be drained any further due to this classic deadlock.
1182 *
1183 * In order to avoid deadlocks as mentioned above we have an extra level of
1184 * indirection achieved by having two workqueues.
1185 * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1186 * messages from the Node QP RB, adds them to a list and queues work for the
1187 * second workqueue.
1188 *
1189 * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1190 * messages from the list, handles them, frees up the memory and dequeues
1191 * more elements from the list if possible.
1192 */
1193int
1194scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1195{
1196        int read_size;
1197        struct scif_loopb_msg *msg;
1198
1199        do {
1200                msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1201                if (!msg)
1202                        return -ENOMEM;
1203                read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1204                                             sizeof(struct scifmsg));
1205                if (read_size != sizeof(struct scifmsg)) {
1206                        kfree(msg);
1207                        scif_rb_update_read_ptr(&qp->inbound_q);
1208                        break;
1209                }
1210                spin_lock(&qp->recv_lock);
1211                list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1212                spin_unlock(&qp->recv_lock);
1213                queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1214                scif_rb_update_read_ptr(&qp->inbound_q);
1215        } while (read_size == sizeof(struct scifmsg));
1216        return read_size;
1217}
1218
1219/**
1220 * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1221 * @scifdev: SCIF device
1222 *
1223 * Sets up the required loopback workqueues, queue pairs and ring buffers
1224 */
1225int scif_setup_loopback_qp(struct scif_dev *scifdev)
1226{
1227        int err = 0;
1228        void *local_q;
1229        struct scif_qp *qp;
1230
1231        err = scif_setup_intr_wq(scifdev);
1232        if (err)
1233                goto exit;
1234        INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1235        snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1236                 "SCIF LOOPB %d", scifdev->node);
1237        scif_info.loopb_wq =
1238                alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1239        if (!scif_info.loopb_wq) {
1240                err = -ENOMEM;
1241                goto destroy_intr;
1242        }
1243        INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1244        /* Allocate Self Qpair */
1245        scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1246        if (!scifdev->qpairs) {
1247                err = -ENOMEM;
1248                goto destroy_loopb_wq;
1249        }
1250
1251        qp = scifdev->qpairs;
1252        qp->magic = SCIFEP_MAGIC;
1253        spin_lock_init(&qp->send_lock);
1254        spin_lock_init(&qp->recv_lock);
1255
1256        local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1257        if (!local_q) {
1258                err = -ENOMEM;
1259                goto free_qpairs;
1260        }
1261        /*
1262         * For loopback the inbound_q and outbound_q are essentially the same
1263         * since the Node sends a message on the loopback interface to the
1264         * outbound_q which is then received on the inbound_q.
1265         */
1266        scif_rb_init(&qp->outbound_q,
1267                     &qp->local_read,
1268                     &qp->local_write,
1269                     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270
1271        scif_rb_init(&qp->inbound_q,
1272                     &qp->local_read,
1273                     &qp->local_write,
1274                     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1275        scif_info.nodeid = scifdev->node;
1276
1277        scif_peer_register_device(scifdev);
1278
1279        scif_info.loopb_dev = scifdev;
1280        return err;
1281free_qpairs:
1282        kfree(scifdev->qpairs);
1283destroy_loopb_wq:
1284        destroy_workqueue(scif_info.loopb_wq);
1285destroy_intr:
1286        scif_destroy_intr_wq(scifdev);
1287exit:
1288        return err;
1289}
1290
1291/**
1292 * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1293 * @scifdev: SCIF device
1294 *
1295 * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1296 */
1297int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1298{
1299        scif_peer_unregister_device(scifdev);
1300        destroy_workqueue(scif_info.loopb_wq);
1301        scif_destroy_intr_wq(scifdev);
1302        kfree(scifdev->qpairs->outbound_q.rb_base);
1303        kfree(scifdev->qpairs);
1304        scifdev->sdev = NULL;
1305        scif_info.loopb_dev = NULL;
1306        return 0;
1307}
1308
1309void scif_destroy_p2p(struct scif_dev *scifdev)
1310{
1311        struct scif_dev *peer_dev;
1312        struct scif_p2p_info *p2p;
1313        struct list_head *pos, *tmp;
1314        int bd;
1315
1316        mutex_lock(&scif_info.conflock);
1317        /* Free P2P mappings in the given node for all its peer nodes */
1318        list_for_each_safe(pos, tmp, &scifdev->p2p) {
1319                p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1320                dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1321                             p2p->sg_nentries[SCIF_PPI_MMIO],
1322                             DMA_BIDIRECTIONAL);
1323                dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1324                             p2p->sg_nentries[SCIF_PPI_APER],
1325                             DMA_BIDIRECTIONAL);
1326                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1327                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1328                list_del(pos);
1329                kfree(p2p);
1330        }
1331
1332        /* Free P2P mapping created in the peer nodes for the given node */
1333        for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1334                peer_dev = &scif_dev[bd];
1335                list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1336                        p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1337                        if (p2p->ppi_peer_id == scifdev->node) {
1338                                dma_unmap_sg(&peer_dev->sdev->dev,
1339                                             p2p->ppi_sg[SCIF_PPI_MMIO],
1340                                             p2p->sg_nentries[SCIF_PPI_MMIO],
1341                                             DMA_BIDIRECTIONAL);
1342                                dma_unmap_sg(&peer_dev->sdev->dev,
1343                                             p2p->ppi_sg[SCIF_PPI_APER],
1344                                             p2p->sg_nentries[SCIF_PPI_APER],
1345                                             DMA_BIDIRECTIONAL);
1346                                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1347                                scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1348                                list_del(pos);
1349                                kfree(p2p);
1350                        }
1351                }
1352        }
1353        mutex_unlock(&scif_info.conflock);
1354}
1355