linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15                struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17                struct smbd_connection *info);
  18static void put_receive_buffer(
  19                struct smbd_connection *info,
  20                struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25                struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27                struct smbd_connection *info,
  28                struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30                struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33                struct smbd_connection *info,
  34                struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38                struct smbd_connection *info,
  39                struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41                struct page *page, unsigned long offset,
  42                size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1 0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT        445
  52#define SMBD_PORT       5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT    5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT  120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE           128
  62#define SMBD_MIN_FRAGMENTED_SIZE        131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES     32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY                   6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY               0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING                    0x1
 115#define LOG_INCOMING                    0x2
 116#define LOG_READ                        0x4
 117#define LOG_WRITE                       0x8
 118#define LOG_RDMA_SEND                   0x10
 119#define LOG_RDMA_RECV                   0x20
 120#define LOG_KEEP_ALIVE                  0x40
 121#define LOG_RDMA_EVENT                  0x80
 122#define LOG_RDMA_MR                     0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126        "Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR             0x0
 129#define INFO            0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133        "Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)                            \
 136do {                                                                    \
 137        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 138                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144                log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160        struct smbd_connection *info =
 161                container_of(work, struct smbd_connection, disconnect_work);
 162
 163        if (info->transport_status == SMBD_CONNECTED) {
 164                info->transport_status = SMBD_DISCONNECTING;
 165                rdma_disconnect(info->id);
 166        }
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171        queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176                struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178        struct smbd_connection *info = id->context;
 179
 180        log_rdma_event(INFO, "event=%d status=%d\n",
 181                event->event, event->status);
 182
 183        switch (event->event) {
 184        case RDMA_CM_EVENT_ADDR_RESOLVED:
 185        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186                info->ri_rc = 0;
 187                complete(&info->ri_done);
 188                break;
 189
 190        case RDMA_CM_EVENT_ADDR_ERROR:
 191                info->ri_rc = -EHOSTUNREACH;
 192                complete(&info->ri_done);
 193                break;
 194
 195        case RDMA_CM_EVENT_ROUTE_ERROR:
 196                info->ri_rc = -ENETUNREACH;
 197                complete(&info->ri_done);
 198                break;
 199
 200        case RDMA_CM_EVENT_ESTABLISHED:
 201                log_rdma_event(INFO, "connected event=%d\n", event->event);
 202                info->transport_status = SMBD_CONNECTED;
 203                wake_up_interruptible(&info->conn_wait);
 204                break;
 205
 206        case RDMA_CM_EVENT_CONNECT_ERROR:
 207        case RDMA_CM_EVENT_UNREACHABLE:
 208        case RDMA_CM_EVENT_REJECTED:
 209                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210                info->transport_status = SMBD_DISCONNECTED;
 211                wake_up_interruptible(&info->conn_wait);
 212                break;
 213
 214        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215        case RDMA_CM_EVENT_DISCONNECTED:
 216                /* This happenes when we fail the negotiation */
 217                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218                        info->transport_status = SMBD_DISCONNECTED;
 219                        wake_up(&info->conn_wait);
 220                        break;
 221                }
 222
 223                info->transport_status = SMBD_DISCONNECTED;
 224                wake_up_interruptible(&info->disconn_wait);
 225                wake_up_interruptible(&info->wait_reassembly_queue);
 226                wake_up_interruptible_all(&info->wait_send_queue);
 227                break;
 228
 229        default:
 230                break;
 231        }
 232
 233        return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240        struct smbd_connection *info = context;
 241
 242        log_rdma_event(ERR, "%s on device %s info %p\n",
 243                ib_event_msg(event->event), event->device->name, info);
 244
 245        switch (event->event) {
 246        case IB_EVENT_CQ_ERR:
 247        case IB_EVENT_QP_FATAL:
 248                smbd_disconnect_rdma_connection(info);
 249                break;
 250
 251        default:
 252                break;
 253        }
 254}
 255
 256static inline void *smbd_request_payload(struct smbd_request *request)
 257{
 258        return (void *)request->packet;
 259}
 260
 261static inline void *smbd_response_payload(struct smbd_response *response)
 262{
 263        return (void *)response->packet;
 264}
 265
 266/* Called when a RDMA send is done */
 267static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 268{
 269        int i;
 270        struct smbd_request *request =
 271                container_of(wc->wr_cqe, struct smbd_request, cqe);
 272
 273        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 274                request, wc->status);
 275
 276        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 277                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 278                        wc->status, wc->opcode);
 279                smbd_disconnect_rdma_connection(request->info);
 280        }
 281
 282        for (i = 0; i < request->num_sge; i++)
 283                ib_dma_unmap_single(request->info->id->device,
 284                        request->sge[i].addr,
 285                        request->sge[i].length,
 286                        DMA_TO_DEVICE);
 287
 288        if (atomic_dec_and_test(&request->info->send_pending))
 289                wake_up(&request->info->wait_send_pending);
 290
 291        wake_up(&request->info->wait_post_send);
 292
 293        mempool_free(request, request->info->request_mempool);
 294}
 295
 296static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 297{
 298        log_rdma_event(INFO, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
 299                       resp->min_version, resp->max_version,
 300                       resp->negotiated_version, resp->credits_requested,
 301                       resp->credits_granted, resp->status,
 302                       resp->max_readwrite_size, resp->preferred_send_size,
 303                       resp->max_receive_size, resp->max_fragmented_size);
 304}
 305
 306/*
 307 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 308 * response, packet_length: the negotiation response message
 309 * return value: true if negotiation is a success, false if failed
 310 */
 311static bool process_negotiation_response(
 312                struct smbd_response *response, int packet_length)
 313{
 314        struct smbd_connection *info = response->info;
 315        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 316
 317        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 318                log_rdma_event(ERR,
 319                        "error: packet_length=%d\n", packet_length);
 320                return false;
 321        }
 322
 323        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 324                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 325                        le16_to_cpu(packet->negotiated_version));
 326                return false;
 327        }
 328        info->protocol = le16_to_cpu(packet->negotiated_version);
 329
 330        if (packet->credits_requested == 0) {
 331                log_rdma_event(ERR, "error: credits_requested==0\n");
 332                return false;
 333        }
 334        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 335
 336        if (packet->credits_granted == 0) {
 337                log_rdma_event(ERR, "error: credits_granted==0\n");
 338                return false;
 339        }
 340        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 341
 342        atomic_set(&info->receive_credits, 0);
 343
 344        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 345                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 346                        le32_to_cpu(packet->preferred_send_size));
 347                return false;
 348        }
 349        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 350
 351        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 352                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 353                        le32_to_cpu(packet->max_receive_size));
 354                return false;
 355        }
 356        info->max_send_size = min_t(int, info->max_send_size,
 357                                        le32_to_cpu(packet->max_receive_size));
 358
 359        if (le32_to_cpu(packet->max_fragmented_size) <
 360                        SMBD_MIN_FRAGMENTED_SIZE) {
 361                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 362                        le32_to_cpu(packet->max_fragmented_size));
 363                return false;
 364        }
 365        info->max_fragmented_send_size =
 366                le32_to_cpu(packet->max_fragmented_size);
 367        info->rdma_readwrite_threshold =
 368                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 369                info->max_fragmented_send_size :
 370                rdma_readwrite_threshold;
 371
 372
 373        info->max_readwrite_size = min_t(u32,
 374                        le32_to_cpu(packet->max_readwrite_size),
 375                        info->max_frmr_depth * PAGE_SIZE);
 376        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 377
 378        return true;
 379}
 380
 381static void smbd_post_send_credits(struct work_struct *work)
 382{
 383        int ret = 0;
 384        int use_receive_queue = 1;
 385        int rc;
 386        struct smbd_response *response;
 387        struct smbd_connection *info =
 388                container_of(work, struct smbd_connection,
 389                        post_send_credits_work);
 390
 391        if (info->transport_status != SMBD_CONNECTED) {
 392                wake_up(&info->wait_receive_queues);
 393                return;
 394        }
 395
 396        if (info->receive_credit_target >
 397                atomic_read(&info->receive_credits)) {
 398                while (true) {
 399                        if (use_receive_queue)
 400                                response = get_receive_buffer(info);
 401                        else
 402                                response = get_empty_queue_buffer(info);
 403                        if (!response) {
 404                                /* now switch to emtpy packet queue */
 405                                if (use_receive_queue) {
 406                                        use_receive_queue = 0;
 407                                        continue;
 408                                } else
 409                                        break;
 410                        }
 411
 412                        response->type = SMBD_TRANSFER_DATA;
 413                        response->first_segment = false;
 414                        rc = smbd_post_recv(info, response);
 415                        if (rc) {
 416                                log_rdma_recv(ERR,
 417                                        "post_recv failed rc=%d\n", rc);
 418                                put_receive_buffer(info, response);
 419                                break;
 420                        }
 421
 422                        ret++;
 423                }
 424        }
 425
 426        spin_lock(&info->lock_new_credits_offered);
 427        info->new_credits_offered += ret;
 428        spin_unlock(&info->lock_new_credits_offered);
 429
 430        /* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
 431        info->send_immediate = true;
 432        if (atomic_read(&info->receive_credits) <
 433                info->receive_credit_target - 1) {
 434                if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
 435                    info->send_immediate) {
 436                        log_keep_alive(INFO, "send an empty message\n");
 437                        smbd_post_send_empty(info);
 438                }
 439        }
 440}
 441
 442/* Called from softirq, when recv is done */
 443static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 444{
 445        struct smbd_data_transfer *data_transfer;
 446        struct smbd_response *response =
 447                container_of(wc->wr_cqe, struct smbd_response, cqe);
 448        struct smbd_connection *info = response->info;
 449        int data_length = 0;
 450
 451        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
 452                      response, response->type, wc->status, wc->opcode,
 453                      wc->byte_len, wc->pkey_index);
 454
 455        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 456                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 457                        wc->status, wc->opcode);
 458                smbd_disconnect_rdma_connection(info);
 459                goto error;
 460        }
 461
 462        ib_dma_sync_single_for_cpu(
 463                wc->qp->device,
 464                response->sge.addr,
 465                response->sge.length,
 466                DMA_FROM_DEVICE);
 467
 468        switch (response->type) {
 469        /* SMBD negotiation response */
 470        case SMBD_NEGOTIATE_RESP:
 471                dump_smbd_negotiate_resp(smbd_response_payload(response));
 472                info->full_packet_received = true;
 473                info->negotiate_done =
 474                        process_negotiation_response(response, wc->byte_len);
 475                complete(&info->negotiate_completion);
 476                break;
 477
 478        /* SMBD data transfer packet */
 479        case SMBD_TRANSFER_DATA:
 480                data_transfer = smbd_response_payload(response);
 481                data_length = le32_to_cpu(data_transfer->data_length);
 482
 483                /*
 484                 * If this is a packet with data playload place the data in
 485                 * reassembly queue and wake up the reading thread
 486                 */
 487                if (data_length) {
 488                        if (info->full_packet_received)
 489                                response->first_segment = true;
 490
 491                        if (le32_to_cpu(data_transfer->remaining_data_length))
 492                                info->full_packet_received = false;
 493                        else
 494                                info->full_packet_received = true;
 495
 496                        enqueue_reassembly(
 497                                info,
 498                                response,
 499                                data_length);
 500                } else
 501                        put_empty_packet(info, response);
 502
 503                if (data_length)
 504                        wake_up_interruptible(&info->wait_reassembly_queue);
 505
 506                atomic_dec(&info->receive_credits);
 507                info->receive_credit_target =
 508                        le16_to_cpu(data_transfer->credits_requested);
 509                if (le16_to_cpu(data_transfer->credits_granted)) {
 510                        atomic_add(le16_to_cpu(data_transfer->credits_granted),
 511                                &info->send_credits);
 512                        /*
 513                         * We have new send credits granted from remote peer
 514                         * If any sender is waiting for credits, unblock it
 515                         */
 516                        wake_up_interruptible(&info->wait_send_queue);
 517                }
 518
 519                log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
 520                             le16_to_cpu(data_transfer->flags),
 521                             le32_to_cpu(data_transfer->data_offset),
 522                             le32_to_cpu(data_transfer->data_length),
 523                             le32_to_cpu(data_transfer->remaining_data_length));
 524
 525                /* Send a KEEP_ALIVE response right away if requested */
 526                info->keep_alive_requested = KEEP_ALIVE_NONE;
 527                if (le16_to_cpu(data_transfer->flags) &
 528                                SMB_DIRECT_RESPONSE_REQUESTED) {
 529                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 530                }
 531
 532                return;
 533
 534        default:
 535                log_rdma_recv(ERR,
 536                        "unexpected response type=%d\n", response->type);
 537        }
 538
 539error:
 540        put_receive_buffer(info, response);
 541}
 542
 543static struct rdma_cm_id *smbd_create_id(
 544                struct smbd_connection *info,
 545                struct sockaddr *dstaddr, int port)
 546{
 547        struct rdma_cm_id *id;
 548        int rc;
 549        __be16 *sport;
 550
 551        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 552                RDMA_PS_TCP, IB_QPT_RC);
 553        if (IS_ERR(id)) {
 554                rc = PTR_ERR(id);
 555                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 556                return id;
 557        }
 558
 559        if (dstaddr->sa_family == AF_INET6)
 560                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 561        else
 562                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 563
 564        *sport = htons(port);
 565
 566        init_completion(&info->ri_done);
 567        info->ri_rc = -ETIMEDOUT;
 568
 569        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 570                RDMA_RESOLVE_TIMEOUT);
 571        if (rc) {
 572                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 573                goto out;
 574        }
 575        wait_for_completion_interruptible_timeout(
 576                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 577        rc = info->ri_rc;
 578        if (rc) {
 579                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 580                goto out;
 581        }
 582
 583        info->ri_rc = -ETIMEDOUT;
 584        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 585        if (rc) {
 586                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 587                goto out;
 588        }
 589        wait_for_completion_interruptible_timeout(
 590                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 591        rc = info->ri_rc;
 592        if (rc) {
 593                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 594                goto out;
 595        }
 596
 597        return id;
 598
 599out:
 600        rdma_destroy_id(id);
 601        return ERR_PTR(rc);
 602}
 603
 604/*
 605 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 606 * This implementation requries FRWR on RDMA read/write
 607 * return value: true if it is supported
 608 */
 609static bool frwr_is_supported(struct ib_device_attr *attrs)
 610{
 611        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 612                return false;
 613        if (attrs->max_fast_reg_page_list_len == 0)
 614                return false;
 615        return true;
 616}
 617
 618static int smbd_ia_open(
 619                struct smbd_connection *info,
 620                struct sockaddr *dstaddr, int port)
 621{
 622        int rc;
 623
 624        info->id = smbd_create_id(info, dstaddr, port);
 625        if (IS_ERR(info->id)) {
 626                rc = PTR_ERR(info->id);
 627                goto out1;
 628        }
 629
 630        if (!frwr_is_supported(&info->id->device->attrs)) {
 631                log_rdma_event(ERR, "Fast Registration Work Requests (FRWR) is not supported\n");
 632                log_rdma_event(ERR, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
 633                               info->id->device->attrs.device_cap_flags,
 634                               info->id->device->attrs.max_fast_reg_page_list_len);
 635                rc = -EPROTONOSUPPORT;
 636                goto out2;
 637        }
 638        info->max_frmr_depth = min_t(int,
 639                smbd_max_frmr_depth,
 640                info->id->device->attrs.max_fast_reg_page_list_len);
 641        info->mr_type = IB_MR_TYPE_MEM_REG;
 642        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 643                info->mr_type = IB_MR_TYPE_SG_GAPS;
 644
 645        info->pd = ib_alloc_pd(info->id->device, 0);
 646        if (IS_ERR(info->pd)) {
 647                rc = PTR_ERR(info->pd);
 648                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 649                goto out2;
 650        }
 651
 652        return 0;
 653
 654out2:
 655        rdma_destroy_id(info->id);
 656        info->id = NULL;
 657
 658out1:
 659        return rc;
 660}
 661
 662/*
 663 * Send a negotiation request message to the peer
 664 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 665 * After negotiation, the transport is connected and ready for
 666 * carrying upper layer SMB payload
 667 */
 668static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 669{
 670        struct ib_send_wr send_wr;
 671        int rc = -ENOMEM;
 672        struct smbd_request *request;
 673        struct smbd_negotiate_req *packet;
 674
 675        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 676        if (!request)
 677                return rc;
 678
 679        request->info = info;
 680
 681        packet = smbd_request_payload(request);
 682        packet->min_version = cpu_to_le16(SMBD_V1);
 683        packet->max_version = cpu_to_le16(SMBD_V1);
 684        packet->reserved = 0;
 685        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 686        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 687        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 688        packet->max_fragmented_size =
 689                cpu_to_le32(info->max_fragmented_recv_size);
 690
 691        request->num_sge = 1;
 692        request->sge[0].addr = ib_dma_map_single(
 693                                info->id->device, (void *)packet,
 694                                sizeof(*packet), DMA_TO_DEVICE);
 695        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 696                rc = -EIO;
 697                goto dma_mapping_failed;
 698        }
 699
 700        request->sge[0].length = sizeof(*packet);
 701        request->sge[0].lkey = info->pd->local_dma_lkey;
 702
 703        ib_dma_sync_single_for_device(
 704                info->id->device, request->sge[0].addr,
 705                request->sge[0].length, DMA_TO_DEVICE);
 706
 707        request->cqe.done = send_done;
 708
 709        send_wr.next = NULL;
 710        send_wr.wr_cqe = &request->cqe;
 711        send_wr.sg_list = request->sge;
 712        send_wr.num_sge = request->num_sge;
 713        send_wr.opcode = IB_WR_SEND;
 714        send_wr.send_flags = IB_SEND_SIGNALED;
 715
 716        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 717                request->sge[0].addr,
 718                request->sge[0].length, request->sge[0].lkey);
 719
 720        atomic_inc(&info->send_pending);
 721        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 722        if (!rc)
 723                return 0;
 724
 725        /* if we reach here, post send failed */
 726        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 727        atomic_dec(&info->send_pending);
 728        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 729                request->sge[0].length, DMA_TO_DEVICE);
 730
 731        smbd_disconnect_rdma_connection(info);
 732
 733dma_mapping_failed:
 734        mempool_free(request, info->request_mempool);
 735        return rc;
 736}
 737
 738/*
 739 * Extend the credits to remote peer
 740 * This implements [MS-SMBD] 3.1.5.9
 741 * The idea is that we should extend credits to remote peer as quickly as
 742 * it's allowed, to maintain data flow. We allocate as much receive
 743 * buffer as possible, and extend the receive credits to remote peer
 744 * return value: the new credtis being granted.
 745 */
 746static int manage_credits_prior_sending(struct smbd_connection *info)
 747{
 748        int new_credits;
 749
 750        spin_lock(&info->lock_new_credits_offered);
 751        new_credits = info->new_credits_offered;
 752        info->new_credits_offered = 0;
 753        spin_unlock(&info->lock_new_credits_offered);
 754
 755        return new_credits;
 756}
 757
 758/*
 759 * Check if we need to send a KEEP_ALIVE message
 760 * The idle connection timer triggers a KEEP_ALIVE message when expires
 761 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 762 * back a response.
 763 * return value:
 764 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 765 * 0: otherwise
 766 */
 767static int manage_keep_alive_before_sending(struct smbd_connection *info)
 768{
 769        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 770                info->keep_alive_requested = KEEP_ALIVE_SENT;
 771                return 1;
 772        }
 773        return 0;
 774}
 775
 776/* Post the send request */
 777static int smbd_post_send(struct smbd_connection *info,
 778                struct smbd_request *request)
 779{
 780        struct ib_send_wr send_wr;
 781        int rc, i;
 782
 783        for (i = 0; i < request->num_sge; i++) {
 784                log_rdma_send(INFO,
 785                        "rdma_request sge[%d] addr=%llu length=%u\n",
 786                        i, request->sge[i].addr, request->sge[i].length);
 787                ib_dma_sync_single_for_device(
 788                        info->id->device,
 789                        request->sge[i].addr,
 790                        request->sge[i].length,
 791                        DMA_TO_DEVICE);
 792        }
 793
 794        request->cqe.done = send_done;
 795
 796        send_wr.next = NULL;
 797        send_wr.wr_cqe = &request->cqe;
 798        send_wr.sg_list = request->sge;
 799        send_wr.num_sge = request->num_sge;
 800        send_wr.opcode = IB_WR_SEND;
 801        send_wr.send_flags = IB_SEND_SIGNALED;
 802
 803        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 804        if (rc) {
 805                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 806                smbd_disconnect_rdma_connection(info);
 807                rc = -EAGAIN;
 808        } else
 809                /* Reset timer for idle connection after packet is sent */
 810                mod_delayed_work(info->workqueue, &info->idle_timer_work,
 811                        info->keep_alive_interval*HZ);
 812
 813        return rc;
 814}
 815
 816static int smbd_post_send_sgl(struct smbd_connection *info,
 817        struct scatterlist *sgl, int data_length, int remaining_data_length)
 818{
 819        int num_sgs;
 820        int i, rc;
 821        int header_length;
 822        struct smbd_request *request;
 823        struct smbd_data_transfer *packet;
 824        int new_credits;
 825        struct scatterlist *sg;
 826
 827wait_credit:
 828        /* Wait for send credits. A SMBD packet needs one credit */
 829        rc = wait_event_interruptible(info->wait_send_queue,
 830                atomic_read(&info->send_credits) > 0 ||
 831                info->transport_status != SMBD_CONNECTED);
 832        if (rc)
 833                goto err_wait_credit;
 834
 835        if (info->transport_status != SMBD_CONNECTED) {
 836                log_outgoing(ERR, "disconnected not sending on wait_credit\n");
 837                rc = -EAGAIN;
 838                goto err_wait_credit;
 839        }
 840        if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
 841                atomic_inc(&info->send_credits);
 842                goto wait_credit;
 843        }
 844
 845wait_send_queue:
 846        wait_event(info->wait_post_send,
 847                atomic_read(&info->send_pending) < info->send_credit_target ||
 848                info->transport_status != SMBD_CONNECTED);
 849
 850        if (info->transport_status != SMBD_CONNECTED) {
 851                log_outgoing(ERR, "disconnected not sending on wait_send_queue\n");
 852                rc = -EAGAIN;
 853                goto err_wait_send_queue;
 854        }
 855
 856        if (unlikely(atomic_inc_return(&info->send_pending) >
 857                                info->send_credit_target)) {
 858                atomic_dec(&info->send_pending);
 859                goto wait_send_queue;
 860        }
 861
 862        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 863        if (!request) {
 864                rc = -ENOMEM;
 865                goto err_alloc;
 866        }
 867
 868        request->info = info;
 869
 870        /* Fill in the packet header */
 871        packet = smbd_request_payload(request);
 872        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 873
 874        new_credits = manage_credits_prior_sending(info);
 875        atomic_add(new_credits, &info->receive_credits);
 876        packet->credits_granted = cpu_to_le16(new_credits);
 877
 878        info->send_immediate = false;
 879
 880        packet->flags = 0;
 881        if (manage_keep_alive_before_sending(info))
 882                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 883
 884        packet->reserved = 0;
 885        if (!data_length)
 886                packet->data_offset = 0;
 887        else
 888                packet->data_offset = cpu_to_le32(24);
 889        packet->data_length = cpu_to_le32(data_length);
 890        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 891        packet->padding = 0;
 892
 893        log_outgoing(INFO, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
 894                     le16_to_cpu(packet->credits_requested),
 895                     le16_to_cpu(packet->credits_granted),
 896                     le32_to_cpu(packet->data_offset),
 897                     le32_to_cpu(packet->data_length),
 898                     le32_to_cpu(packet->remaining_data_length));
 899
 900        /* Map the packet to DMA */
 901        header_length = sizeof(struct smbd_data_transfer);
 902        /* If this is a packet without payload, don't send padding */
 903        if (!data_length)
 904                header_length = offsetof(struct smbd_data_transfer, padding);
 905
 906        request->num_sge = 1;
 907        request->sge[0].addr = ib_dma_map_single(info->id->device,
 908                                                 (void *)packet,
 909                                                 header_length,
 910                                                 DMA_TO_DEVICE);
 911        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 912                rc = -EIO;
 913                request->sge[0].addr = 0;
 914                goto err_dma;
 915        }
 916
 917        request->sge[0].length = header_length;
 918        request->sge[0].lkey = info->pd->local_dma_lkey;
 919
 920        /* Fill in the packet data payload */
 921        num_sgs = sgl ? sg_nents(sgl) : 0;
 922        for_each_sg(sgl, sg, num_sgs, i) {
 923                request->sge[i+1].addr =
 924                        ib_dma_map_page(info->id->device, sg_page(sg),
 925                               sg->offset, sg->length, DMA_TO_DEVICE);
 926                if (ib_dma_mapping_error(
 927                                info->id->device, request->sge[i+1].addr)) {
 928                        rc = -EIO;
 929                        request->sge[i+1].addr = 0;
 930                        goto err_dma;
 931                }
 932                request->sge[i+1].length = sg->length;
 933                request->sge[i+1].lkey = info->pd->local_dma_lkey;
 934                request->num_sge++;
 935        }
 936
 937        rc = smbd_post_send(info, request);
 938        if (!rc)
 939                return 0;
 940
 941err_dma:
 942        for (i = 0; i < request->num_sge; i++)
 943                if (request->sge[i].addr)
 944                        ib_dma_unmap_single(info->id->device,
 945                                            request->sge[i].addr,
 946                                            request->sge[i].length,
 947                                            DMA_TO_DEVICE);
 948        mempool_free(request, info->request_mempool);
 949
 950        /* roll back receive credits and credits to be offered */
 951        spin_lock(&info->lock_new_credits_offered);
 952        info->new_credits_offered += new_credits;
 953        spin_unlock(&info->lock_new_credits_offered);
 954        atomic_sub(new_credits, &info->receive_credits);
 955
 956err_alloc:
 957        if (atomic_dec_and_test(&info->send_pending))
 958                wake_up(&info->wait_send_pending);
 959
 960err_wait_send_queue:
 961        /* roll back send credits and pending */
 962        atomic_inc(&info->send_credits);
 963
 964err_wait_credit:
 965        return rc;
 966}
 967
 968/*
 969 * Send a page
 970 * page: the page to send
 971 * offset: offset in the page to send
 972 * size: length in the page to send
 973 * remaining_data_length: remaining data to send in this payload
 974 */
 975static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
 976                unsigned long offset, size_t size, int remaining_data_length)
 977{
 978        struct scatterlist sgl;
 979
 980        sg_init_table(&sgl, 1);
 981        sg_set_page(&sgl, page, size, offset);
 982
 983        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
 984}
 985
 986/*
 987 * Send an empty message
 988 * Empty message is used to extend credits to peer to for keep live
 989 * while there is no upper layer payload to send at the time
 990 */
 991static int smbd_post_send_empty(struct smbd_connection *info)
 992{
 993        info->count_send_empty++;
 994        return smbd_post_send_sgl(info, NULL, 0, 0);
 995}
 996
 997/*
 998 * Send a data buffer
 999 * iov: the iov array describing the data buffers
1000 * n_vec: number of iov array
1001 * remaining_data_length: remaining data to send following this packet
1002 * in segmented SMBD packet
1003 */
1004static int smbd_post_send_data(
1005        struct smbd_connection *info, struct kvec *iov, int n_vec,
1006        int remaining_data_length)
1007{
1008        int i;
1009        u32 data_length = 0;
1010        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1011
1012        if (n_vec > SMBDIRECT_MAX_SGE) {
1013                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1014                return -EINVAL;
1015        }
1016
1017        sg_init_table(sgl, n_vec);
1018        for (i = 0; i < n_vec; i++) {
1019                data_length += iov[i].iov_len;
1020                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1021        }
1022
1023        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1024}
1025
1026/*
1027 * Post a receive request to the transport
1028 * The remote peer can only send data when a receive request is posted
1029 * The interaction is controlled by send/receive credit system
1030 */
1031static int smbd_post_recv(
1032                struct smbd_connection *info, struct smbd_response *response)
1033{
1034        struct ib_recv_wr recv_wr;
1035        int rc = -EIO;
1036
1037        response->sge.addr = ib_dma_map_single(
1038                                info->id->device, response->packet,
1039                                info->max_receive_size, DMA_FROM_DEVICE);
1040        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1041                return rc;
1042
1043        response->sge.length = info->max_receive_size;
1044        response->sge.lkey = info->pd->local_dma_lkey;
1045
1046        response->cqe.done = recv_done;
1047
1048        recv_wr.wr_cqe = &response->cqe;
1049        recv_wr.next = NULL;
1050        recv_wr.sg_list = &response->sge;
1051        recv_wr.num_sge = 1;
1052
1053        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1054        if (rc) {
1055                ib_dma_unmap_single(info->id->device, response->sge.addr,
1056                                    response->sge.length, DMA_FROM_DEVICE);
1057                smbd_disconnect_rdma_connection(info);
1058                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1059        }
1060
1061        return rc;
1062}
1063
1064/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1065static int smbd_negotiate(struct smbd_connection *info)
1066{
1067        int rc;
1068        struct smbd_response *response = get_receive_buffer(info);
1069
1070        response->type = SMBD_NEGOTIATE_RESP;
1071        rc = smbd_post_recv(info, response);
1072        log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
1073                       rc, response->sge.addr,
1074                       response->sge.length, response->sge.lkey);
1075        if (rc)
1076                return rc;
1077
1078        init_completion(&info->negotiate_completion);
1079        info->negotiate_done = false;
1080        rc = smbd_post_send_negotiate_req(info);
1081        if (rc)
1082                return rc;
1083
1084        rc = wait_for_completion_interruptible_timeout(
1085                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1086        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1087
1088        if (info->negotiate_done)
1089                return 0;
1090
1091        if (rc == 0)
1092                rc = -ETIMEDOUT;
1093        else if (rc == -ERESTARTSYS)
1094                rc = -EINTR;
1095        else
1096                rc = -ENOTCONN;
1097
1098        return rc;
1099}
1100
1101static void put_empty_packet(
1102                struct smbd_connection *info, struct smbd_response *response)
1103{
1104        spin_lock(&info->empty_packet_queue_lock);
1105        list_add_tail(&response->list, &info->empty_packet_queue);
1106        info->count_empty_packet_queue++;
1107        spin_unlock(&info->empty_packet_queue_lock);
1108
1109        queue_work(info->workqueue, &info->post_send_credits_work);
1110}
1111
1112/*
1113 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1114 * This is a queue for reassembling upper layer payload and present to upper
1115 * layer. All the inncoming payload go to the reassembly queue, regardless of
1116 * if reassembly is required. The uuper layer code reads from the queue for all
1117 * incoming payloads.
1118 * Put a received packet to the reassembly queue
1119 * response: the packet received
1120 * data_length: the size of payload in this packet
1121 */
1122static void enqueue_reassembly(
1123        struct smbd_connection *info,
1124        struct smbd_response *response,
1125        int data_length)
1126{
1127        spin_lock(&info->reassembly_queue_lock);
1128        list_add_tail(&response->list, &info->reassembly_queue);
1129        info->reassembly_queue_length++;
1130        /*
1131         * Make sure reassembly_data_length is updated after list and
1132         * reassembly_queue_length are updated. On the dequeue side
1133         * reassembly_data_length is checked without a lock to determine
1134         * if reassembly_queue_length and list is up to date
1135         */
1136        virt_wmb();
1137        info->reassembly_data_length += data_length;
1138        spin_unlock(&info->reassembly_queue_lock);
1139        info->count_reassembly_queue++;
1140        info->count_enqueue_reassembly_queue++;
1141}
1142
1143/*
1144 * Get the first entry at the front of reassembly queue
1145 * Caller is responsible for locking
1146 * return value: the first entry if any, NULL if queue is empty
1147 */
1148static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1149{
1150        struct smbd_response *ret = NULL;
1151
1152        if (!list_empty(&info->reassembly_queue)) {
1153                ret = list_first_entry(
1154                        &info->reassembly_queue,
1155                        struct smbd_response, list);
1156        }
1157        return ret;
1158}
1159
1160static struct smbd_response *get_empty_queue_buffer(
1161                struct smbd_connection *info)
1162{
1163        struct smbd_response *ret = NULL;
1164        unsigned long flags;
1165
1166        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1167        if (!list_empty(&info->empty_packet_queue)) {
1168                ret = list_first_entry(
1169                        &info->empty_packet_queue,
1170                        struct smbd_response, list);
1171                list_del(&ret->list);
1172                info->count_empty_packet_queue--;
1173        }
1174        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1175
1176        return ret;
1177}
1178
1179/*
1180 * Get a receive buffer
1181 * For each remote send, we need to post a receive. The receive buffers are
1182 * pre-allocated in advance.
1183 * return value: the receive buffer, NULL if none is available
1184 */
1185static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1186{
1187        struct smbd_response *ret = NULL;
1188        unsigned long flags;
1189
1190        spin_lock_irqsave(&info->receive_queue_lock, flags);
1191        if (!list_empty(&info->receive_queue)) {
1192                ret = list_first_entry(
1193                        &info->receive_queue,
1194                        struct smbd_response, list);
1195                list_del(&ret->list);
1196                info->count_receive_queue--;
1197                info->count_get_receive_buffer++;
1198        }
1199        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1200
1201        return ret;
1202}
1203
1204/*
1205 * Return a receive buffer
1206 * Upon returning of a receive buffer, we can post new receive and extend
1207 * more receive credits to remote peer. This is done immediately after a
1208 * receive buffer is returned.
1209 */
1210static void put_receive_buffer(
1211        struct smbd_connection *info, struct smbd_response *response)
1212{
1213        unsigned long flags;
1214
1215        ib_dma_unmap_single(info->id->device, response->sge.addr,
1216                response->sge.length, DMA_FROM_DEVICE);
1217
1218        spin_lock_irqsave(&info->receive_queue_lock, flags);
1219        list_add_tail(&response->list, &info->receive_queue);
1220        info->count_receive_queue++;
1221        info->count_put_receive_buffer++;
1222        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1223
1224        queue_work(info->workqueue, &info->post_send_credits_work);
1225}
1226
1227/* Preallocate all receive buffer on transport establishment */
1228static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1229{
1230        int i;
1231        struct smbd_response *response;
1232
1233        INIT_LIST_HEAD(&info->reassembly_queue);
1234        spin_lock_init(&info->reassembly_queue_lock);
1235        info->reassembly_data_length = 0;
1236        info->reassembly_queue_length = 0;
1237
1238        INIT_LIST_HEAD(&info->receive_queue);
1239        spin_lock_init(&info->receive_queue_lock);
1240        info->count_receive_queue = 0;
1241
1242        INIT_LIST_HEAD(&info->empty_packet_queue);
1243        spin_lock_init(&info->empty_packet_queue_lock);
1244        info->count_empty_packet_queue = 0;
1245
1246        init_waitqueue_head(&info->wait_receive_queues);
1247
1248        for (i = 0; i < num_buf; i++) {
1249                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1250                if (!response)
1251                        goto allocate_failed;
1252
1253                response->info = info;
1254                list_add_tail(&response->list, &info->receive_queue);
1255                info->count_receive_queue++;
1256        }
1257
1258        return 0;
1259
1260allocate_failed:
1261        while (!list_empty(&info->receive_queue)) {
1262                response = list_first_entry(
1263                                &info->receive_queue,
1264                                struct smbd_response, list);
1265                list_del(&response->list);
1266                info->count_receive_queue--;
1267
1268                mempool_free(response, info->response_mempool);
1269        }
1270        return -ENOMEM;
1271}
1272
1273static void destroy_receive_buffers(struct smbd_connection *info)
1274{
1275        struct smbd_response *response;
1276
1277        while ((response = get_receive_buffer(info)))
1278                mempool_free(response, info->response_mempool);
1279
1280        while ((response = get_empty_queue_buffer(info)))
1281                mempool_free(response, info->response_mempool);
1282}
1283
1284/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1285static void idle_connection_timer(struct work_struct *work)
1286{
1287        struct smbd_connection *info = container_of(
1288                                        work, struct smbd_connection,
1289                                        idle_timer_work.work);
1290
1291        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1292                log_keep_alive(ERR,
1293                        "error status info->keep_alive_requested=%d\n",
1294                        info->keep_alive_requested);
1295                smbd_disconnect_rdma_connection(info);
1296                return;
1297        }
1298
1299        log_keep_alive(INFO, "about to send an empty idle message\n");
1300        smbd_post_send_empty(info);
1301
1302        /* Setup the next idle timeout work */
1303        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1304                        info->keep_alive_interval*HZ);
1305}
1306
1307/*
1308 * Destroy the transport and related RDMA and memory resources
1309 * Need to go through all the pending counters and make sure on one is using
1310 * the transport while it is destroyed
1311 */
1312void smbd_destroy(struct TCP_Server_Info *server)
1313{
1314        struct smbd_connection *info = server->smbd_conn;
1315        struct smbd_response *response;
1316        unsigned long flags;
1317
1318        if (!info) {
1319                log_rdma_event(INFO, "rdma session already destroyed\n");
1320                return;
1321        }
1322
1323        log_rdma_event(INFO, "destroying rdma session\n");
1324        if (info->transport_status != SMBD_DISCONNECTED) {
1325                rdma_disconnect(server->smbd_conn->id);
1326                log_rdma_event(INFO, "wait for transport being disconnected\n");
1327                wait_event_interruptible(
1328                        info->disconn_wait,
1329                        info->transport_status == SMBD_DISCONNECTED);
1330        }
1331
1332        log_rdma_event(INFO, "destroying qp\n");
1333        ib_drain_qp(info->id->qp);
1334        rdma_destroy_qp(info->id);
1335
1336        log_rdma_event(INFO, "cancelling idle timer\n");
1337        cancel_delayed_work_sync(&info->idle_timer_work);
1338
1339        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1340        wait_event(info->wait_send_pending,
1341                atomic_read(&info->send_pending) == 0);
1342
1343        /* It's not posssible for upper layer to get to reassembly */
1344        log_rdma_event(INFO, "drain the reassembly queue\n");
1345        do {
1346                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1347                response = _get_first_reassembly(info);
1348                if (response) {
1349                        list_del(&response->list);
1350                        spin_unlock_irqrestore(
1351                                &info->reassembly_queue_lock, flags);
1352                        put_receive_buffer(info, response);
1353                } else
1354                        spin_unlock_irqrestore(
1355                                &info->reassembly_queue_lock, flags);
1356        } while (response);
1357        info->reassembly_data_length = 0;
1358
1359        log_rdma_event(INFO, "free receive buffers\n");
1360        wait_event(info->wait_receive_queues,
1361                info->count_receive_queue + info->count_empty_packet_queue
1362                        == info->receive_credit_max);
1363        destroy_receive_buffers(info);
1364
1365        /*
1366         * For performance reasons, memory registration and deregistration
1367         * are not locked by srv_mutex. It is possible some processes are
1368         * blocked on transport srv_mutex while holding memory registration.
1369         * Release the transport srv_mutex to allow them to hit the failure
1370         * path when sending data, and then release memory registartions.
1371         */
1372        log_rdma_event(INFO, "freeing mr list\n");
1373        wake_up_interruptible_all(&info->wait_mr);
1374        while (atomic_read(&info->mr_used_count)) {
1375                mutex_unlock(&server->srv_mutex);
1376                msleep(1000);
1377                mutex_lock(&server->srv_mutex);
1378        }
1379        destroy_mr_list(info);
1380
1381        ib_free_cq(info->send_cq);
1382        ib_free_cq(info->recv_cq);
1383        ib_dealloc_pd(info->pd);
1384        rdma_destroy_id(info->id);
1385
1386        /* free mempools */
1387        mempool_destroy(info->request_mempool);
1388        kmem_cache_destroy(info->request_cache);
1389
1390        mempool_destroy(info->response_mempool);
1391        kmem_cache_destroy(info->response_cache);
1392
1393        info->transport_status = SMBD_DESTROYED;
1394
1395        destroy_workqueue(info->workqueue);
1396        log_rdma_event(INFO,  "rdma session destroyed\n");
1397        kfree(info);
1398}
1399
1400/*
1401 * Reconnect this SMBD connection, called from upper layer
1402 * return value: 0 on success, or actual error code
1403 */
1404int smbd_reconnect(struct TCP_Server_Info *server)
1405{
1406        log_rdma_event(INFO, "reconnecting rdma session\n");
1407
1408        if (!server->smbd_conn) {
1409                log_rdma_event(INFO, "rdma session already destroyed\n");
1410                goto create_conn;
1411        }
1412
1413        /*
1414         * This is possible if transport is disconnected and we haven't received
1415         * notification from RDMA, but upper layer has detected timeout
1416         */
1417        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1418                log_rdma_event(INFO, "disconnecting transport\n");
1419                smbd_destroy(server);
1420        }
1421
1422create_conn:
1423        log_rdma_event(INFO, "creating rdma session\n");
1424        server->smbd_conn = smbd_get_connection(
1425                server, (struct sockaddr *) &server->dstaddr);
1426
1427        if (server->smbd_conn)
1428                cifs_dbg(VFS, "RDMA transport re-established\n");
1429
1430        return server->smbd_conn ? 0 : -ENOENT;
1431}
1432
1433static void destroy_caches_and_workqueue(struct smbd_connection *info)
1434{
1435        destroy_receive_buffers(info);
1436        destroy_workqueue(info->workqueue);
1437        mempool_destroy(info->response_mempool);
1438        kmem_cache_destroy(info->response_cache);
1439        mempool_destroy(info->request_mempool);
1440        kmem_cache_destroy(info->request_cache);
1441}
1442
1443#define MAX_NAME_LEN    80
1444static int allocate_caches_and_workqueue(struct smbd_connection *info)
1445{
1446        char name[MAX_NAME_LEN];
1447        int rc;
1448
1449        scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1450        info->request_cache =
1451                kmem_cache_create(
1452                        name,
1453                        sizeof(struct smbd_request) +
1454                                sizeof(struct smbd_data_transfer),
1455                        0, SLAB_HWCACHE_ALIGN, NULL);
1456        if (!info->request_cache)
1457                return -ENOMEM;
1458
1459        info->request_mempool =
1460                mempool_create(info->send_credit_target, mempool_alloc_slab,
1461                        mempool_free_slab, info->request_cache);
1462        if (!info->request_mempool)
1463                goto out1;
1464
1465        scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1466        info->response_cache =
1467                kmem_cache_create(
1468                        name,
1469                        sizeof(struct smbd_response) +
1470                                info->max_receive_size,
1471                        0, SLAB_HWCACHE_ALIGN, NULL);
1472        if (!info->response_cache)
1473                goto out2;
1474
1475        info->response_mempool =
1476                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1477                       mempool_free_slab, info->response_cache);
1478        if (!info->response_mempool)
1479                goto out3;
1480
1481        scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1482        info->workqueue = create_workqueue(name);
1483        if (!info->workqueue)
1484                goto out4;
1485
1486        rc = allocate_receive_buffers(info, info->receive_credit_max);
1487        if (rc) {
1488                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1489                goto out5;
1490        }
1491
1492        return 0;
1493
1494out5:
1495        destroy_workqueue(info->workqueue);
1496out4:
1497        mempool_destroy(info->response_mempool);
1498out3:
1499        kmem_cache_destroy(info->response_cache);
1500out2:
1501        mempool_destroy(info->request_mempool);
1502out1:
1503        kmem_cache_destroy(info->request_cache);
1504        return -ENOMEM;
1505}
1506
1507/* Create a SMBD connection, called by upper layer */
1508static struct smbd_connection *_smbd_get_connection(
1509        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1510{
1511        int rc;
1512        struct smbd_connection *info;
1513        struct rdma_conn_param conn_param;
1514        struct ib_qp_init_attr qp_attr;
1515        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1516        struct ib_port_immutable port_immutable;
1517        u32 ird_ord_hdr[2];
1518
1519        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1520        if (!info)
1521                return NULL;
1522
1523        info->transport_status = SMBD_CONNECTING;
1524        rc = smbd_ia_open(info, dstaddr, port);
1525        if (rc) {
1526                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1527                goto create_id_failed;
1528        }
1529
1530        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1531            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1532                log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1533                               smbd_send_credit_target,
1534                               info->id->device->attrs.max_cqe,
1535                               info->id->device->attrs.max_qp_wr);
1536                goto config_failed;
1537        }
1538
1539        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1540            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1541                log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1542                               smbd_receive_credit_max,
1543                               info->id->device->attrs.max_cqe,
1544                               info->id->device->attrs.max_qp_wr);
1545                goto config_failed;
1546        }
1547
1548        info->receive_credit_max = smbd_receive_credit_max;
1549        info->send_credit_target = smbd_send_credit_target;
1550        info->max_send_size = smbd_max_send_size;
1551        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1552        info->max_receive_size = smbd_max_receive_size;
1553        info->keep_alive_interval = smbd_keep_alive_interval;
1554
1555        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1556                log_rdma_event(ERR,
1557                        "warning: device max_send_sge = %d too small\n",
1558                        info->id->device->attrs.max_send_sge);
1559                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1560        }
1561        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1562                log_rdma_event(ERR,
1563                        "warning: device max_recv_sge = %d too small\n",
1564                        info->id->device->attrs.max_recv_sge);
1565                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1566        }
1567
1568        info->send_cq = NULL;
1569        info->recv_cq = NULL;
1570        info->send_cq =
1571                ib_alloc_cq_any(info->id->device, info,
1572                                info->send_credit_target, IB_POLL_SOFTIRQ);
1573        if (IS_ERR(info->send_cq)) {
1574                info->send_cq = NULL;
1575                goto alloc_cq_failed;
1576        }
1577
1578        info->recv_cq =
1579                ib_alloc_cq_any(info->id->device, info,
1580                                info->receive_credit_max, IB_POLL_SOFTIRQ);
1581        if (IS_ERR(info->recv_cq)) {
1582                info->recv_cq = NULL;
1583                goto alloc_cq_failed;
1584        }
1585
1586        memset(&qp_attr, 0, sizeof(qp_attr));
1587        qp_attr.event_handler = smbd_qp_async_error_upcall;
1588        qp_attr.qp_context = info;
1589        qp_attr.cap.max_send_wr = info->send_credit_target;
1590        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1591        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1592        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1593        qp_attr.cap.max_inline_data = 0;
1594        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1595        qp_attr.qp_type = IB_QPT_RC;
1596        qp_attr.send_cq = info->send_cq;
1597        qp_attr.recv_cq = info->recv_cq;
1598        qp_attr.port_num = ~0;
1599
1600        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1601        if (rc) {
1602                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1603                goto create_qp_failed;
1604        }
1605
1606        memset(&conn_param, 0, sizeof(conn_param));
1607        conn_param.initiator_depth = 0;
1608
1609        conn_param.responder_resources =
1610                info->id->device->attrs.max_qp_rd_atom
1611                        < SMBD_CM_RESPONDER_RESOURCES ?
1612                info->id->device->attrs.max_qp_rd_atom :
1613                SMBD_CM_RESPONDER_RESOURCES;
1614        info->responder_resources = conn_param.responder_resources;
1615        log_rdma_mr(INFO, "responder_resources=%d\n",
1616                info->responder_resources);
1617
1618        /* Need to send IRD/ORD in private data for iWARP */
1619        info->id->device->ops.get_port_immutable(
1620                info->id->device, info->id->port_num, &port_immutable);
1621        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1622                ird_ord_hdr[0] = info->responder_resources;
1623                ird_ord_hdr[1] = 1;
1624                conn_param.private_data = ird_ord_hdr;
1625                conn_param.private_data_len = sizeof(ird_ord_hdr);
1626        } else {
1627                conn_param.private_data = NULL;
1628                conn_param.private_data_len = 0;
1629        }
1630
1631        conn_param.retry_count = SMBD_CM_RETRY;
1632        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1633        conn_param.flow_control = 0;
1634
1635        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1636                &addr_in->sin_addr, port);
1637
1638        init_waitqueue_head(&info->conn_wait);
1639        init_waitqueue_head(&info->disconn_wait);
1640        init_waitqueue_head(&info->wait_reassembly_queue);
1641        rc = rdma_connect(info->id, &conn_param);
1642        if (rc) {
1643                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1644                goto rdma_connect_failed;
1645        }
1646
1647        wait_event_interruptible(
1648                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1649
1650        if (info->transport_status != SMBD_CONNECTED) {
1651                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1652                goto rdma_connect_failed;
1653        }
1654
1655        log_rdma_event(INFO, "rdma_connect connected\n");
1656
1657        rc = allocate_caches_and_workqueue(info);
1658        if (rc) {
1659                log_rdma_event(ERR, "cache allocation failed\n");
1660                goto allocate_cache_failed;
1661        }
1662
1663        init_waitqueue_head(&info->wait_send_queue);
1664        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1665        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1666                info->keep_alive_interval*HZ);
1667
1668        init_waitqueue_head(&info->wait_send_pending);
1669        atomic_set(&info->send_pending, 0);
1670
1671        init_waitqueue_head(&info->wait_post_send);
1672
1673        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1674        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1675        info->new_credits_offered = 0;
1676        spin_lock_init(&info->lock_new_credits_offered);
1677
1678        rc = smbd_negotiate(info);
1679        if (rc) {
1680                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1681                goto negotiation_failed;
1682        }
1683
1684        rc = allocate_mr_list(info);
1685        if (rc) {
1686                log_rdma_mr(ERR, "memory registration allocation failed\n");
1687                goto allocate_mr_failed;
1688        }
1689
1690        return info;
1691
1692allocate_mr_failed:
1693        /* At this point, need to a full transport shutdown */
1694        smbd_destroy(server);
1695        return NULL;
1696
1697negotiation_failed:
1698        cancel_delayed_work_sync(&info->idle_timer_work);
1699        destroy_caches_and_workqueue(info);
1700        info->transport_status = SMBD_NEGOTIATE_FAILED;
1701        init_waitqueue_head(&info->conn_wait);
1702        rdma_disconnect(info->id);
1703        wait_event(info->conn_wait,
1704                info->transport_status == SMBD_DISCONNECTED);
1705
1706allocate_cache_failed:
1707rdma_connect_failed:
1708        rdma_destroy_qp(info->id);
1709
1710create_qp_failed:
1711alloc_cq_failed:
1712        if (info->send_cq)
1713                ib_free_cq(info->send_cq);
1714        if (info->recv_cq)
1715                ib_free_cq(info->recv_cq);
1716
1717config_failed:
1718        ib_dealloc_pd(info->pd);
1719        rdma_destroy_id(info->id);
1720
1721create_id_failed:
1722        kfree(info);
1723        return NULL;
1724}
1725
1726struct smbd_connection *smbd_get_connection(
1727        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1728{
1729        struct smbd_connection *ret;
1730        int port = SMBD_PORT;
1731
1732try_again:
1733        ret = _smbd_get_connection(server, dstaddr, port);
1734
1735        /* Try SMB_PORT if SMBD_PORT doesn't work */
1736        if (!ret && port == SMBD_PORT) {
1737                port = SMB_PORT;
1738                goto try_again;
1739        }
1740        return ret;
1741}
1742
1743/*
1744 * Receive data from receive reassembly queue
1745 * All the incoming data packets are placed in reassembly queue
1746 * buf: the buffer to read data into
1747 * size: the length of data to read
1748 * return value: actual data read
1749 * Note: this implementation copies the data from reassebmly queue to receive
1750 * buffers used by upper layer. This is not the optimal code path. A better way
1751 * to do it is to not have upper layer allocate its receive buffers but rather
1752 * borrow the buffer from reassembly queue, and return it after data is
1753 * consumed. But this will require more changes to upper layer code, and also
1754 * need to consider packet boundaries while they still being reassembled.
1755 */
1756static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1757                unsigned int size)
1758{
1759        struct smbd_response *response;
1760        struct smbd_data_transfer *data_transfer;
1761        int to_copy, to_read, data_read, offset;
1762        u32 data_length, remaining_data_length, data_offset;
1763        int rc;
1764
1765again:
1766        /*
1767         * No need to hold the reassembly queue lock all the time as we are
1768         * the only one reading from the front of the queue. The transport
1769         * may add more entries to the back of the queue at the same time
1770         */
1771        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1772                info->reassembly_data_length);
1773        if (info->reassembly_data_length >= size) {
1774                int queue_length;
1775                int queue_removed = 0;
1776
1777                /*
1778                 * Need to make sure reassembly_data_length is read before
1779                 * reading reassembly_queue_length and calling
1780                 * _get_first_reassembly. This call is lock free
1781                 * as we never read at the end of the queue which are being
1782                 * updated in SOFTIRQ as more data is received
1783                 */
1784                virt_rmb();
1785                queue_length = info->reassembly_queue_length;
1786                data_read = 0;
1787                to_read = size;
1788                offset = info->first_entry_offset;
1789                while (data_read < size) {
1790                        response = _get_first_reassembly(info);
1791                        data_transfer = smbd_response_payload(response);
1792                        data_length = le32_to_cpu(data_transfer->data_length);
1793                        remaining_data_length =
1794                                le32_to_cpu(
1795                                        data_transfer->remaining_data_length);
1796                        data_offset = le32_to_cpu(data_transfer->data_offset);
1797
1798                        /*
1799                         * The upper layer expects RFC1002 length at the
1800                         * beginning of the payload. Return it to indicate
1801                         * the total length of the packet. This minimize the
1802                         * change to upper layer packet processing logic. This
1803                         * will be eventually remove when an intermediate
1804                         * transport layer is added
1805                         */
1806                        if (response->first_segment && size == 4) {
1807                                unsigned int rfc1002_len =
1808                                        data_length + remaining_data_length;
1809                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1810                                data_read = 4;
1811                                response->first_segment = false;
1812                                log_read(INFO, "returning rfc1002 length %d\n",
1813                                        rfc1002_len);
1814                                goto read_rfc1002_done;
1815                        }
1816
1817                        to_copy = min_t(int, data_length - offset, to_read);
1818                        memcpy(
1819                                buf + data_read,
1820                                (char *)data_transfer + data_offset + offset,
1821                                to_copy);
1822
1823                        /* move on to the next buffer? */
1824                        if (to_copy == data_length - offset) {
1825                                queue_length--;
1826                                /*
1827                                 * No need to lock if we are not at the
1828                                 * end of the queue
1829                                 */
1830                                if (queue_length)
1831                                        list_del(&response->list);
1832                                else {
1833                                        spin_lock_irq(
1834                                                &info->reassembly_queue_lock);
1835                                        list_del(&response->list);
1836                                        spin_unlock_irq(
1837                                                &info->reassembly_queue_lock);
1838                                }
1839                                queue_removed++;
1840                                info->count_reassembly_queue--;
1841                                info->count_dequeue_reassembly_queue++;
1842                                put_receive_buffer(info, response);
1843                                offset = 0;
1844                                log_read(INFO, "put_receive_buffer offset=0\n");
1845                        } else
1846                                offset += to_copy;
1847
1848                        to_read -= to_copy;
1849                        data_read += to_copy;
1850
1851                        log_read(INFO, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
1852                                 to_copy, data_length - offset,
1853                                 to_read, data_read, offset);
1854                }
1855
1856                spin_lock_irq(&info->reassembly_queue_lock);
1857                info->reassembly_data_length -= data_read;
1858                info->reassembly_queue_length -= queue_removed;
1859                spin_unlock_irq(&info->reassembly_queue_lock);
1860
1861                info->first_entry_offset = offset;
1862                log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
1863                         data_read, info->reassembly_data_length,
1864                         info->first_entry_offset);
1865read_rfc1002_done:
1866                return data_read;
1867        }
1868
1869        log_read(INFO, "wait_event on more data\n");
1870        rc = wait_event_interruptible(
1871                info->wait_reassembly_queue,
1872                info->reassembly_data_length >= size ||
1873                        info->transport_status != SMBD_CONNECTED);
1874        /* Don't return any data if interrupted */
1875        if (rc)
1876                return rc;
1877
1878        if (info->transport_status != SMBD_CONNECTED) {
1879                log_read(ERR, "disconnected\n");
1880                return -ECONNABORTED;
1881        }
1882
1883        goto again;
1884}
1885
1886/*
1887 * Receive a page from receive reassembly queue
1888 * page: the page to read data into
1889 * to_read: the length of data to read
1890 * return value: actual data read
1891 */
1892static int smbd_recv_page(struct smbd_connection *info,
1893                struct page *page, unsigned int page_offset,
1894                unsigned int to_read)
1895{
1896        int ret;
1897        char *to_address;
1898        void *page_address;
1899
1900        /* make sure we have the page ready for read */
1901        ret = wait_event_interruptible(
1902                info->wait_reassembly_queue,
1903                info->reassembly_data_length >= to_read ||
1904                        info->transport_status != SMBD_CONNECTED);
1905        if (ret)
1906                return ret;
1907
1908        /* now we can read from reassembly queue and not sleep */
1909        page_address = kmap_atomic(page);
1910        to_address = (char *) page_address + page_offset;
1911
1912        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
1913                page, to_address, to_read);
1914
1915        ret = smbd_recv_buf(info, to_address, to_read);
1916        kunmap_atomic(page_address);
1917
1918        return ret;
1919}
1920
1921/*
1922 * Receive data from transport
1923 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1924 * return: total bytes read, or 0. SMB Direct will not do partial read.
1925 */
1926int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
1927{
1928        char *buf;
1929        struct page *page;
1930        unsigned int to_read, page_offset;
1931        int rc;
1932
1933        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
1934                /* It's a bug in upper layer to get there */
1935                cifs_dbg(VFS, "Invalid msg iter dir %u\n",
1936                         iov_iter_rw(&msg->msg_iter));
1937                rc = -EINVAL;
1938                goto out;
1939        }
1940
1941        switch (iov_iter_type(&msg->msg_iter)) {
1942        case ITER_KVEC:
1943                buf = msg->msg_iter.kvec->iov_base;
1944                to_read = msg->msg_iter.kvec->iov_len;
1945                rc = smbd_recv_buf(info, buf, to_read);
1946                break;
1947
1948        case ITER_BVEC:
1949                page = msg->msg_iter.bvec->bv_page;
1950                page_offset = msg->msg_iter.bvec->bv_offset;
1951                to_read = msg->msg_iter.bvec->bv_len;
1952                rc = smbd_recv_page(info, page, page_offset, to_read);
1953                break;
1954
1955        default:
1956                /* It's a bug in upper layer to get there */
1957                cifs_dbg(VFS, "Invalid msg type %d\n",
1958                         iov_iter_type(&msg->msg_iter));
1959                rc = -EINVAL;
1960        }
1961
1962out:
1963        /* SMBDirect will read it all or nothing */
1964        if (rc > 0)
1965                msg->msg_iter.count = 0;
1966        return rc;
1967}
1968
1969/*
1970 * Send data to transport
1971 * Each rqst is transported as a SMBDirect payload
1972 * rqst: the data to write
1973 * return value: 0 if successfully write, otherwise error code
1974 */
1975int smbd_send(struct TCP_Server_Info *server,
1976        int num_rqst, struct smb_rqst *rqst_array)
1977{
1978        struct smbd_connection *info = server->smbd_conn;
1979        struct kvec vec;
1980        int nvecs;
1981        int size;
1982        unsigned int buflen, remaining_data_length;
1983        int start, i, j;
1984        int max_iov_size =
1985                info->max_send_size - sizeof(struct smbd_data_transfer);
1986        struct kvec *iov;
1987        int rc;
1988        struct smb_rqst *rqst;
1989        int rqst_idx;
1990
1991        if (info->transport_status != SMBD_CONNECTED) {
1992                rc = -EAGAIN;
1993                goto done;
1994        }
1995
1996        /*
1997         * Add in the page array if there is one. The caller needs to set
1998         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
1999         * ends at page boundary
2000         */
2001        remaining_data_length = 0;
2002        for (i = 0; i < num_rqst; i++)
2003                remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2004
2005        if (remaining_data_length > info->max_fragmented_send_size) {
2006                log_write(ERR, "payload size %d > max size %d\n",
2007                        remaining_data_length, info->max_fragmented_send_size);
2008                rc = -EINVAL;
2009                goto done;
2010        }
2011
2012        log_write(INFO, "num_rqst=%d total length=%u\n",
2013                        num_rqst, remaining_data_length);
2014
2015        rqst_idx = 0;
2016next_rqst:
2017        rqst = &rqst_array[rqst_idx];
2018        iov = rqst->rq_iov;
2019
2020        cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2021                rqst_idx, smb_rqst_len(server, rqst));
2022        for (i = 0; i < rqst->rq_nvec; i++)
2023                dump_smb(iov[i].iov_base, iov[i].iov_len);
2024
2025
2026        log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
2027                  rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2028                  rqst->rq_tailsz, smb_rqst_len(server, rqst));
2029
2030        start = i = 0;
2031        buflen = 0;
2032        while (true) {
2033                buflen += iov[i].iov_len;
2034                if (buflen > max_iov_size) {
2035                        if (i > start) {
2036                                remaining_data_length -=
2037                                        (buflen-iov[i].iov_len);
2038                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2039                                          start, i, i - start,
2040                                          remaining_data_length);
2041                                rc = smbd_post_send_data(
2042                                        info, &iov[start], i-start,
2043                                        remaining_data_length);
2044                                if (rc)
2045                                        goto done;
2046                        } else {
2047                                /* iov[start] is too big, break it */
2048                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2049                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
2050                                          start, iov[start].iov_base,
2051                                          buflen, nvecs);
2052                                for (j = 0; j < nvecs; j++) {
2053                                        vec.iov_base =
2054                                                (char *)iov[start].iov_base +
2055                                                j*max_iov_size;
2056                                        vec.iov_len = max_iov_size;
2057                                        if (j == nvecs-1)
2058                                                vec.iov_len =
2059                                                        buflen -
2060                                                        max_iov_size*(nvecs-1);
2061                                        remaining_data_length -= vec.iov_len;
2062                                        log_write(INFO,
2063                                                "sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
2064                                                  j, vec.iov_base, vec.iov_len,
2065                                                  remaining_data_length);
2066                                        rc = smbd_post_send_data(
2067                                                info, &vec, 1,
2068                                                remaining_data_length);
2069                                        if (rc)
2070                                                goto done;
2071                                }
2072                                i++;
2073                                if (i == rqst->rq_nvec)
2074                                        break;
2075                        }
2076                        start = i;
2077                        buflen = 0;
2078                } else {
2079                        i++;
2080                        if (i == rqst->rq_nvec) {
2081                                /* send out all remaining vecs */
2082                                remaining_data_length -= buflen;
2083                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2084                                          start, i, i - start,
2085                                          remaining_data_length);
2086                                rc = smbd_post_send_data(info, &iov[start],
2087                                        i-start, remaining_data_length);
2088                                if (rc)
2089                                        goto done;
2090                                break;
2091                        }
2092                }
2093                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2094        }
2095
2096        /* now sending pages if there are any */
2097        for (i = 0; i < rqst->rq_npages; i++) {
2098                unsigned int offset;
2099
2100                rqst_page_get_length(rqst, i, &buflen, &offset);
2101                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2102                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2103                        buflen, nvecs);
2104                for (j = 0; j < nvecs; j++) {
2105                        size = max_iov_size;
2106                        if (j == nvecs-1)
2107                                size = buflen - j*max_iov_size;
2108                        remaining_data_length -= size;
2109                        log_write(INFO, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
2110                                  i, j * max_iov_size + offset, size,
2111                                  remaining_data_length);
2112                        rc = smbd_post_send_page(
2113                                info, rqst->rq_pages[i],
2114                                j*max_iov_size + offset,
2115                                size, remaining_data_length);
2116                        if (rc)
2117                                goto done;
2118                }
2119        }
2120
2121        rqst_idx++;
2122        if (rqst_idx < num_rqst)
2123                goto next_rqst;
2124
2125done:
2126        /*
2127         * As an optimization, we don't wait for individual I/O to finish
2128         * before sending the next one.
2129         * Send them all and wait for pending send count to get to 0
2130         * that means all the I/Os have been out and we are good to return
2131         */
2132
2133        wait_event(info->wait_send_pending,
2134                atomic_read(&info->send_pending) == 0);
2135
2136        return rc;
2137}
2138
2139static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2140{
2141        struct smbd_mr *mr;
2142        struct ib_cqe *cqe;
2143
2144        if (wc->status) {
2145                log_rdma_mr(ERR, "status=%d\n", wc->status);
2146                cqe = wc->wr_cqe;
2147                mr = container_of(cqe, struct smbd_mr, cqe);
2148                smbd_disconnect_rdma_connection(mr->conn);
2149        }
2150}
2151
2152/*
2153 * The work queue function that recovers MRs
2154 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2155 * again. Both calls are slow, so finish them in a workqueue. This will not
2156 * block I/O path.
2157 * There is one workqueue that recovers MRs, there is no need to lock as the
2158 * I/O requests calling smbd_register_mr will never update the links in the
2159 * mr_list.
2160 */
2161static void smbd_mr_recovery_work(struct work_struct *work)
2162{
2163        struct smbd_connection *info =
2164                container_of(work, struct smbd_connection, mr_recovery_work);
2165        struct smbd_mr *smbdirect_mr;
2166        int rc;
2167
2168        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2169                if (smbdirect_mr->state == MR_ERROR) {
2170
2171                        /* recover this MR entry */
2172                        rc = ib_dereg_mr(smbdirect_mr->mr);
2173                        if (rc) {
2174                                log_rdma_mr(ERR,
2175                                        "ib_dereg_mr failed rc=%x\n",
2176                                        rc);
2177                                smbd_disconnect_rdma_connection(info);
2178                                continue;
2179                        }
2180
2181                        smbdirect_mr->mr = ib_alloc_mr(
2182                                info->pd, info->mr_type,
2183                                info->max_frmr_depth);
2184                        if (IS_ERR(smbdirect_mr->mr)) {
2185                                log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2186                                            info->mr_type,
2187                                            info->max_frmr_depth);
2188                                smbd_disconnect_rdma_connection(info);
2189                                continue;
2190                        }
2191                } else
2192                        /* This MR is being used, don't recover it */
2193                        continue;
2194
2195                smbdirect_mr->state = MR_READY;
2196
2197                /* smbdirect_mr->state is updated by this function
2198                 * and is read and updated by I/O issuing CPUs trying
2199                 * to get a MR, the call to atomic_inc_return
2200                 * implicates a memory barrier and guarantees this
2201                 * value is updated before waking up any calls to
2202                 * get_mr() from the I/O issuing CPUs
2203                 */
2204                if (atomic_inc_return(&info->mr_ready_count) == 1)
2205                        wake_up_interruptible(&info->wait_mr);
2206        }
2207}
2208
2209static void destroy_mr_list(struct smbd_connection *info)
2210{
2211        struct smbd_mr *mr, *tmp;
2212
2213        cancel_work_sync(&info->mr_recovery_work);
2214        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2215                if (mr->state == MR_INVALIDATED)
2216                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2217                                mr->sgl_count, mr->dir);
2218                ib_dereg_mr(mr->mr);
2219                kfree(mr->sgl);
2220                kfree(mr);
2221        }
2222}
2223
2224/*
2225 * Allocate MRs used for RDMA read/write
2226 * The number of MRs will not exceed hardware capability in responder_resources
2227 * All MRs are kept in mr_list. The MR can be recovered after it's used
2228 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2229 * as MRs are used and recovered for I/O, but the list links will not change
2230 */
2231static int allocate_mr_list(struct smbd_connection *info)
2232{
2233        int i;
2234        struct smbd_mr *smbdirect_mr, *tmp;
2235
2236        INIT_LIST_HEAD(&info->mr_list);
2237        init_waitqueue_head(&info->wait_mr);
2238        spin_lock_init(&info->mr_list_lock);
2239        atomic_set(&info->mr_ready_count, 0);
2240        atomic_set(&info->mr_used_count, 0);
2241        init_waitqueue_head(&info->wait_for_mr_cleanup);
2242        /* Allocate more MRs (2x) than hardware responder_resources */
2243        for (i = 0; i < info->responder_resources * 2; i++) {
2244                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2245                if (!smbdirect_mr)
2246                        goto out;
2247                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2248                                        info->max_frmr_depth);
2249                if (IS_ERR(smbdirect_mr->mr)) {
2250                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2251                                    info->mr_type, info->max_frmr_depth);
2252                        goto out;
2253                }
2254                smbdirect_mr->sgl = kcalloc(
2255                                        info->max_frmr_depth,
2256                                        sizeof(struct scatterlist),
2257                                        GFP_KERNEL);
2258                if (!smbdirect_mr->sgl) {
2259                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2260                        ib_dereg_mr(smbdirect_mr->mr);
2261                        goto out;
2262                }
2263                smbdirect_mr->state = MR_READY;
2264                smbdirect_mr->conn = info;
2265
2266                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2267                atomic_inc(&info->mr_ready_count);
2268        }
2269        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2270        return 0;
2271
2272out:
2273        kfree(smbdirect_mr);
2274
2275        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2276                ib_dereg_mr(smbdirect_mr->mr);
2277                kfree(smbdirect_mr->sgl);
2278                kfree(smbdirect_mr);
2279        }
2280        return -ENOMEM;
2281}
2282
2283/*
2284 * Get a MR from mr_list. This function waits until there is at least one
2285 * MR available in the list. It may access the list while the
2286 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2287 * as they never modify the same places. However, there may be several CPUs
2288 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2289 * protect this situation.
2290 */
2291static struct smbd_mr *get_mr(struct smbd_connection *info)
2292{
2293        struct smbd_mr *ret;
2294        int rc;
2295again:
2296        rc = wait_event_interruptible(info->wait_mr,
2297                atomic_read(&info->mr_ready_count) ||
2298                info->transport_status != SMBD_CONNECTED);
2299        if (rc) {
2300                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2301                return NULL;
2302        }
2303
2304        if (info->transport_status != SMBD_CONNECTED) {
2305                log_rdma_mr(ERR, "info->transport_status=%x\n",
2306                        info->transport_status);
2307                return NULL;
2308        }
2309
2310        spin_lock(&info->mr_list_lock);
2311        list_for_each_entry(ret, &info->mr_list, list) {
2312                if (ret->state == MR_READY) {
2313                        ret->state = MR_REGISTERED;
2314                        spin_unlock(&info->mr_list_lock);
2315                        atomic_dec(&info->mr_ready_count);
2316                        atomic_inc(&info->mr_used_count);
2317                        return ret;
2318                }
2319        }
2320
2321        spin_unlock(&info->mr_list_lock);
2322        /*
2323         * It is possible that we could fail to get MR because other processes may
2324         * try to acquire a MR at the same time. If this is the case, retry it.
2325         */
2326        goto again;
2327}
2328
2329/*
2330 * Register memory for RDMA read/write
2331 * pages[]: the list of pages to register memory with
2332 * num_pages: the number of pages to register
2333 * tailsz: if non-zero, the bytes to register in the last page
2334 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2335 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2336 * return value: the MR registered, NULL if failed.
2337 */
2338struct smbd_mr *smbd_register_mr(
2339        struct smbd_connection *info, struct page *pages[], int num_pages,
2340        int offset, int tailsz, bool writing, bool need_invalidate)
2341{
2342        struct smbd_mr *smbdirect_mr;
2343        int rc, i;
2344        enum dma_data_direction dir;
2345        struct ib_reg_wr *reg_wr;
2346
2347        if (num_pages > info->max_frmr_depth) {
2348                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2349                        num_pages, info->max_frmr_depth);
2350                return NULL;
2351        }
2352
2353        smbdirect_mr = get_mr(info);
2354        if (!smbdirect_mr) {
2355                log_rdma_mr(ERR, "get_mr returning NULL\n");
2356                return NULL;
2357        }
2358        smbdirect_mr->need_invalidate = need_invalidate;
2359        smbdirect_mr->sgl_count = num_pages;
2360        sg_init_table(smbdirect_mr->sgl, num_pages);
2361
2362        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2363                        num_pages, offset, tailsz);
2364
2365        if (num_pages == 1) {
2366                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2367                goto skip_multiple_pages;
2368        }
2369
2370        /* We have at least two pages to register */
2371        sg_set_page(
2372                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2373        i = 1;
2374        while (i < num_pages - 1) {
2375                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2376                i++;
2377        }
2378        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2379                tailsz ? tailsz : PAGE_SIZE, 0);
2380
2381skip_multiple_pages:
2382        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2383        smbdirect_mr->dir = dir;
2384        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2385        if (!rc) {
2386                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2387                        num_pages, dir, rc);
2388                goto dma_map_error;
2389        }
2390
2391        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2392                NULL, PAGE_SIZE);
2393        if (rc != num_pages) {
2394                log_rdma_mr(ERR,
2395                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2396                        rc, num_pages);
2397                goto map_mr_error;
2398        }
2399
2400        ib_update_fast_reg_key(smbdirect_mr->mr,
2401                ib_inc_rkey(smbdirect_mr->mr->rkey));
2402        reg_wr = &smbdirect_mr->wr;
2403        reg_wr->wr.opcode = IB_WR_REG_MR;
2404        smbdirect_mr->cqe.done = register_mr_done;
2405        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2406        reg_wr->wr.num_sge = 0;
2407        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2408        reg_wr->mr = smbdirect_mr->mr;
2409        reg_wr->key = smbdirect_mr->mr->rkey;
2410        reg_wr->access = writing ?
2411                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2412                        IB_ACCESS_REMOTE_READ;
2413
2414        /*
2415         * There is no need for waiting for complemtion on ib_post_send
2416         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2417         * on the next ib_post_send when we actaully send I/O to remote peer
2418         */
2419        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2420        if (!rc)
2421                return smbdirect_mr;
2422
2423        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2424                rc, reg_wr->key);
2425
2426        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2427map_mr_error:
2428        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2429                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2430
2431dma_map_error:
2432        smbdirect_mr->state = MR_ERROR;
2433        if (atomic_dec_and_test(&info->mr_used_count))
2434                wake_up(&info->wait_for_mr_cleanup);
2435
2436        smbd_disconnect_rdma_connection(info);
2437
2438        return NULL;
2439}
2440
2441static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2442{
2443        struct smbd_mr *smbdirect_mr;
2444        struct ib_cqe *cqe;
2445
2446        cqe = wc->wr_cqe;
2447        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2448        smbdirect_mr->state = MR_INVALIDATED;
2449        if (wc->status != IB_WC_SUCCESS) {
2450                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2451                smbdirect_mr->state = MR_ERROR;
2452        }
2453        complete(&smbdirect_mr->invalidate_done);
2454}
2455
2456/*
2457 * Deregister a MR after I/O is done
2458 * This function may wait if remote invalidation is not used
2459 * and we have to locally invalidate the buffer to prevent data is being
2460 * modified by remote peer after upper layer consumes it
2461 */
2462int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2463{
2464        struct ib_send_wr *wr;
2465        struct smbd_connection *info = smbdirect_mr->conn;
2466        int rc = 0;
2467
2468        if (smbdirect_mr->need_invalidate) {
2469                /* Need to finish local invalidation before returning */
2470                wr = &smbdirect_mr->inv_wr;
2471                wr->opcode = IB_WR_LOCAL_INV;
2472                smbdirect_mr->cqe.done = local_inv_done;
2473                wr->wr_cqe = &smbdirect_mr->cqe;
2474                wr->num_sge = 0;
2475                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2476                wr->send_flags = IB_SEND_SIGNALED;
2477
2478                init_completion(&smbdirect_mr->invalidate_done);
2479                rc = ib_post_send(info->id->qp, wr, NULL);
2480                if (rc) {
2481                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2482                        smbd_disconnect_rdma_connection(info);
2483                        goto done;
2484                }
2485                wait_for_completion(&smbdirect_mr->invalidate_done);
2486                smbdirect_mr->need_invalidate = false;
2487        } else
2488                /*
2489                 * For remote invalidation, just set it to MR_INVALIDATED
2490                 * and defer to mr_recovery_work to recover the MR for next use
2491                 */
2492                smbdirect_mr->state = MR_INVALIDATED;
2493
2494        if (smbdirect_mr->state == MR_INVALIDATED) {
2495                ib_dma_unmap_sg(
2496                        info->id->device, smbdirect_mr->sgl,
2497                        smbdirect_mr->sgl_count,
2498                        smbdirect_mr->dir);
2499                smbdirect_mr->state = MR_READY;
2500                if (atomic_inc_return(&info->mr_ready_count) == 1)
2501                        wake_up_interruptible(&info->wait_mr);
2502        } else
2503                /*
2504                 * Schedule the work to do MR recovery for future I/Os MR
2505                 * recovery is slow and don't want it to block current I/O
2506                 */
2507                queue_work(info->workqueue, &info->mr_recovery_work);
2508
2509done:
2510        if (atomic_dec_and_test(&info->mr_used_count))
2511                wake_up(&info->wait_for_mr_cleanup);
2512
2513        return rc;
2514}
2515