linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15                struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17                struct smbd_connection *info);
  18static void put_receive_buffer(
  19                struct smbd_connection *info,
  20                struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25                struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27                struct smbd_connection *info,
  28                struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30                struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33                struct smbd_connection *info,
  34                struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38                struct smbd_connection *info,
  39                struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41                struct page *page, unsigned long offset,
  42                size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1 0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT        445
  52#define SMBD_PORT       5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT    5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT  120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE           128
  62#define SMBD_MIN_FRAGMENTED_SIZE        131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES     32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY                   6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY               0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING                    0x1
 115#define LOG_INCOMING                    0x2
 116#define LOG_READ                        0x4
 117#define LOG_WRITE                       0x8
 118#define LOG_RDMA_SEND                   0x10
 119#define LOG_RDMA_RECV                   0x20
 120#define LOG_KEEP_ALIVE                  0x40
 121#define LOG_RDMA_EVENT                  0x80
 122#define LOG_RDMA_MR                     0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126        "Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR             0x0
 129#define INFO            0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133        "Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)                            \
 136do {                                                                    \
 137        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 138                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144                log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160        struct smbd_connection *info =
 161                container_of(work, struct smbd_connection, disconnect_work);
 162
 163        if (info->transport_status == SMBD_CONNECTED) {
 164                info->transport_status = SMBD_DISCONNECTING;
 165                rdma_disconnect(info->id);
 166        }
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171        queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176                struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178        struct smbd_connection *info = id->context;
 179
 180        log_rdma_event(INFO, "event=%d status=%d\n",
 181                event->event, event->status);
 182
 183        switch (event->event) {
 184        case RDMA_CM_EVENT_ADDR_RESOLVED:
 185        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186                info->ri_rc = 0;
 187                complete(&info->ri_done);
 188                break;
 189
 190        case RDMA_CM_EVENT_ADDR_ERROR:
 191                info->ri_rc = -EHOSTUNREACH;
 192                complete(&info->ri_done);
 193                break;
 194
 195        case RDMA_CM_EVENT_ROUTE_ERROR:
 196                info->ri_rc = -ENETUNREACH;
 197                complete(&info->ri_done);
 198                break;
 199
 200        case RDMA_CM_EVENT_ESTABLISHED:
 201                log_rdma_event(INFO, "connected event=%d\n", event->event);
 202                info->transport_status = SMBD_CONNECTED;
 203                wake_up_interruptible(&info->conn_wait);
 204                break;
 205
 206        case RDMA_CM_EVENT_CONNECT_ERROR:
 207        case RDMA_CM_EVENT_UNREACHABLE:
 208        case RDMA_CM_EVENT_REJECTED:
 209                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210                info->transport_status = SMBD_DISCONNECTED;
 211                wake_up_interruptible(&info->conn_wait);
 212                break;
 213
 214        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215        case RDMA_CM_EVENT_DISCONNECTED:
 216                /* This happenes when we fail the negotiation */
 217                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218                        info->transport_status = SMBD_DISCONNECTED;
 219                        wake_up(&info->conn_wait);
 220                        break;
 221                }
 222
 223                info->transport_status = SMBD_DISCONNECTED;
 224                wake_up_interruptible(&info->disconn_wait);
 225                wake_up_interruptible(&info->wait_reassembly_queue);
 226                wake_up_interruptible_all(&info->wait_send_queue);
 227                break;
 228
 229        default:
 230                break;
 231        }
 232
 233        return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240        struct smbd_connection *info = context;
 241
 242        log_rdma_event(ERR, "%s on device %s info %p\n",
 243                ib_event_msg(event->event), event->device->name, info);
 244
 245        switch (event->event) {
 246        case IB_EVENT_CQ_ERR:
 247        case IB_EVENT_QP_FATAL:
 248                smbd_disconnect_rdma_connection(info);
 249                break;
 250
 251        default:
 252                break;
 253        }
 254}
 255
 256static inline void *smbd_request_payload(struct smbd_request *request)
 257{
 258        return (void *)request->packet;
 259}
 260
 261static inline void *smbd_response_payload(struct smbd_response *response)
 262{
 263        return (void *)response->packet;
 264}
 265
 266/* Called when a RDMA send is done */
 267static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 268{
 269        int i;
 270        struct smbd_request *request =
 271                container_of(wc->wr_cqe, struct smbd_request, cqe);
 272
 273        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 274                request, wc->status);
 275
 276        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 277                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 278                        wc->status, wc->opcode);
 279                smbd_disconnect_rdma_connection(request->info);
 280        }
 281
 282        for (i = 0; i < request->num_sge; i++)
 283                ib_dma_unmap_single(request->info->id->device,
 284                        request->sge[i].addr,
 285                        request->sge[i].length,
 286                        DMA_TO_DEVICE);
 287
 288        if (atomic_dec_and_test(&request->info->send_pending))
 289                wake_up(&request->info->wait_send_pending);
 290
 291        wake_up(&request->info->wait_post_send);
 292
 293        mempool_free(request, request->info->request_mempool);
 294}
 295
 296static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 297{
 298        log_rdma_event(INFO, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
 299                       resp->min_version, resp->max_version,
 300                       resp->negotiated_version, resp->credits_requested,
 301                       resp->credits_granted, resp->status,
 302                       resp->max_readwrite_size, resp->preferred_send_size,
 303                       resp->max_receive_size, resp->max_fragmented_size);
 304}
 305
 306/*
 307 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 308 * response, packet_length: the negotiation response message
 309 * return value: true if negotiation is a success, false if failed
 310 */
 311static bool process_negotiation_response(
 312                struct smbd_response *response, int packet_length)
 313{
 314        struct smbd_connection *info = response->info;
 315        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 316
 317        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 318                log_rdma_event(ERR,
 319                        "error: packet_length=%d\n", packet_length);
 320                return false;
 321        }
 322
 323        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 324                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 325                        le16_to_cpu(packet->negotiated_version));
 326                return false;
 327        }
 328        info->protocol = le16_to_cpu(packet->negotiated_version);
 329
 330        if (packet->credits_requested == 0) {
 331                log_rdma_event(ERR, "error: credits_requested==0\n");
 332                return false;
 333        }
 334        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 335
 336        if (packet->credits_granted == 0) {
 337                log_rdma_event(ERR, "error: credits_granted==0\n");
 338                return false;
 339        }
 340        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 341
 342        atomic_set(&info->receive_credits, 0);
 343
 344        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 345                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 346                        le32_to_cpu(packet->preferred_send_size));
 347                return false;
 348        }
 349        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 350
 351        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 352                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 353                        le32_to_cpu(packet->max_receive_size));
 354                return false;
 355        }
 356        info->max_send_size = min_t(int, info->max_send_size,
 357                                        le32_to_cpu(packet->max_receive_size));
 358
 359        if (le32_to_cpu(packet->max_fragmented_size) <
 360                        SMBD_MIN_FRAGMENTED_SIZE) {
 361                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 362                        le32_to_cpu(packet->max_fragmented_size));
 363                return false;
 364        }
 365        info->max_fragmented_send_size =
 366                le32_to_cpu(packet->max_fragmented_size);
 367        info->rdma_readwrite_threshold =
 368                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 369                info->max_fragmented_send_size :
 370                rdma_readwrite_threshold;
 371
 372
 373        info->max_readwrite_size = min_t(u32,
 374                        le32_to_cpu(packet->max_readwrite_size),
 375                        info->max_frmr_depth * PAGE_SIZE);
 376        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 377
 378        return true;
 379}
 380
 381static void smbd_post_send_credits(struct work_struct *work)
 382{
 383        int ret = 0;
 384        int use_receive_queue = 1;
 385        int rc;
 386        struct smbd_response *response;
 387        struct smbd_connection *info =
 388                container_of(work, struct smbd_connection,
 389                        post_send_credits_work);
 390
 391        if (info->transport_status != SMBD_CONNECTED) {
 392                wake_up(&info->wait_receive_queues);
 393                return;
 394        }
 395
 396        if (info->receive_credit_target >
 397                atomic_read(&info->receive_credits)) {
 398                while (true) {
 399                        if (use_receive_queue)
 400                                response = get_receive_buffer(info);
 401                        else
 402                                response = get_empty_queue_buffer(info);
 403                        if (!response) {
 404                                /* now switch to emtpy packet queue */
 405                                if (use_receive_queue) {
 406                                        use_receive_queue = 0;
 407                                        continue;
 408                                } else
 409                                        break;
 410                        }
 411
 412                        response->type = SMBD_TRANSFER_DATA;
 413                        response->first_segment = false;
 414                        rc = smbd_post_recv(info, response);
 415                        if (rc) {
 416                                log_rdma_recv(ERR,
 417                                        "post_recv failed rc=%d\n", rc);
 418                                put_receive_buffer(info, response);
 419                                break;
 420                        }
 421
 422                        ret++;
 423                }
 424        }
 425
 426        spin_lock(&info->lock_new_credits_offered);
 427        info->new_credits_offered += ret;
 428        spin_unlock(&info->lock_new_credits_offered);
 429
 430        /* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
 431        info->send_immediate = true;
 432        if (atomic_read(&info->receive_credits) <
 433                info->receive_credit_target - 1) {
 434                if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
 435                    info->send_immediate) {
 436                        log_keep_alive(INFO, "send an empty message\n");
 437                        smbd_post_send_empty(info);
 438                }
 439        }
 440}
 441
 442/* Called from softirq, when recv is done */
 443static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 444{
 445        struct smbd_data_transfer *data_transfer;
 446        struct smbd_response *response =
 447                container_of(wc->wr_cqe, struct smbd_response, cqe);
 448        struct smbd_connection *info = response->info;
 449        int data_length = 0;
 450
 451        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
 452                      response, response->type, wc->status, wc->opcode,
 453                      wc->byte_len, wc->pkey_index);
 454
 455        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 456                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 457                        wc->status, wc->opcode);
 458                smbd_disconnect_rdma_connection(info);
 459                goto error;
 460        }
 461
 462        ib_dma_sync_single_for_cpu(
 463                wc->qp->device,
 464                response->sge.addr,
 465                response->sge.length,
 466                DMA_FROM_DEVICE);
 467
 468        switch (response->type) {
 469        /* SMBD negotiation response */
 470        case SMBD_NEGOTIATE_RESP:
 471                dump_smbd_negotiate_resp(smbd_response_payload(response));
 472                info->full_packet_received = true;
 473                info->negotiate_done =
 474                        process_negotiation_response(response, wc->byte_len);
 475                complete(&info->negotiate_completion);
 476                break;
 477
 478        /* SMBD data transfer packet */
 479        case SMBD_TRANSFER_DATA:
 480                data_transfer = smbd_response_payload(response);
 481                data_length = le32_to_cpu(data_transfer->data_length);
 482
 483                /*
 484                 * If this is a packet with data playload place the data in
 485                 * reassembly queue and wake up the reading thread
 486                 */
 487                if (data_length) {
 488                        if (info->full_packet_received)
 489                                response->first_segment = true;
 490
 491                        if (le32_to_cpu(data_transfer->remaining_data_length))
 492                                info->full_packet_received = false;
 493                        else
 494                                info->full_packet_received = true;
 495
 496                        enqueue_reassembly(
 497                                info,
 498                                response,
 499                                data_length);
 500                } else
 501                        put_empty_packet(info, response);
 502
 503                if (data_length)
 504                        wake_up_interruptible(&info->wait_reassembly_queue);
 505
 506                atomic_dec(&info->receive_credits);
 507                info->receive_credit_target =
 508                        le16_to_cpu(data_transfer->credits_requested);
 509                if (le16_to_cpu(data_transfer->credits_granted)) {
 510                        atomic_add(le16_to_cpu(data_transfer->credits_granted),
 511                                &info->send_credits);
 512                        /*
 513                         * We have new send credits granted from remote peer
 514                         * If any sender is waiting for credits, unblock it
 515                         */
 516                        wake_up_interruptible(&info->wait_send_queue);
 517                }
 518
 519                log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
 520                             le16_to_cpu(data_transfer->flags),
 521                             le32_to_cpu(data_transfer->data_offset),
 522                             le32_to_cpu(data_transfer->data_length),
 523                             le32_to_cpu(data_transfer->remaining_data_length));
 524
 525                /* Send a KEEP_ALIVE response right away if requested */
 526                info->keep_alive_requested = KEEP_ALIVE_NONE;
 527                if (le16_to_cpu(data_transfer->flags) &
 528                                SMB_DIRECT_RESPONSE_REQUESTED) {
 529                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 530                }
 531
 532                return;
 533
 534        default:
 535                log_rdma_recv(ERR,
 536                        "unexpected response type=%d\n", response->type);
 537        }
 538
 539error:
 540        put_receive_buffer(info, response);
 541}
 542
 543static struct rdma_cm_id *smbd_create_id(
 544                struct smbd_connection *info,
 545                struct sockaddr *dstaddr, int port)
 546{
 547        struct rdma_cm_id *id;
 548        int rc;
 549        __be16 *sport;
 550
 551        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 552                RDMA_PS_TCP, IB_QPT_RC);
 553        if (IS_ERR(id)) {
 554                rc = PTR_ERR(id);
 555                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 556                return id;
 557        }
 558
 559        if (dstaddr->sa_family == AF_INET6)
 560                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 561        else
 562                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 563
 564        *sport = htons(port);
 565
 566        init_completion(&info->ri_done);
 567        info->ri_rc = -ETIMEDOUT;
 568
 569        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 570                RDMA_RESOLVE_TIMEOUT);
 571        if (rc) {
 572                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 573                goto out;
 574        }
 575        rc = wait_for_completion_interruptible_timeout(
 576                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 577        /* e.g. if interrupted returns -ERESTARTSYS */
 578        if (rc < 0) {
 579                log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
 580                goto out;
 581        }
 582        rc = info->ri_rc;
 583        if (rc) {
 584                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 585                goto out;
 586        }
 587
 588        info->ri_rc = -ETIMEDOUT;
 589        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 590        if (rc) {
 591                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 592                goto out;
 593        }
 594        rc = wait_for_completion_interruptible_timeout(
 595                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 596        /* e.g. if interrupted returns -ERESTARTSYS */
 597        if (rc < 0)  {
 598                log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
 599                goto out;
 600        }
 601        rc = info->ri_rc;
 602        if (rc) {
 603                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 604                goto out;
 605        }
 606
 607        return id;
 608
 609out:
 610        rdma_destroy_id(id);
 611        return ERR_PTR(rc);
 612}
 613
 614/*
 615 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 616 * This implementation requries FRWR on RDMA read/write
 617 * return value: true if it is supported
 618 */
 619static bool frwr_is_supported(struct ib_device_attr *attrs)
 620{
 621        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 622                return false;
 623        if (attrs->max_fast_reg_page_list_len == 0)
 624                return false;
 625        return true;
 626}
 627
 628static int smbd_ia_open(
 629                struct smbd_connection *info,
 630                struct sockaddr *dstaddr, int port)
 631{
 632        int rc;
 633
 634        info->id = smbd_create_id(info, dstaddr, port);
 635        if (IS_ERR(info->id)) {
 636                rc = PTR_ERR(info->id);
 637                goto out1;
 638        }
 639
 640        if (!frwr_is_supported(&info->id->device->attrs)) {
 641                log_rdma_event(ERR, "Fast Registration Work Requests (FRWR) is not supported\n");
 642                log_rdma_event(ERR, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
 643                               info->id->device->attrs.device_cap_flags,
 644                               info->id->device->attrs.max_fast_reg_page_list_len);
 645                rc = -EPROTONOSUPPORT;
 646                goto out2;
 647        }
 648        info->max_frmr_depth = min_t(int,
 649                smbd_max_frmr_depth,
 650                info->id->device->attrs.max_fast_reg_page_list_len);
 651        info->mr_type = IB_MR_TYPE_MEM_REG;
 652        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 653                info->mr_type = IB_MR_TYPE_SG_GAPS;
 654
 655        info->pd = ib_alloc_pd(info->id->device, 0);
 656        if (IS_ERR(info->pd)) {
 657                rc = PTR_ERR(info->pd);
 658                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 659                goto out2;
 660        }
 661
 662        return 0;
 663
 664out2:
 665        rdma_destroy_id(info->id);
 666        info->id = NULL;
 667
 668out1:
 669        return rc;
 670}
 671
 672/*
 673 * Send a negotiation request message to the peer
 674 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 675 * After negotiation, the transport is connected and ready for
 676 * carrying upper layer SMB payload
 677 */
 678static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 679{
 680        struct ib_send_wr send_wr;
 681        int rc = -ENOMEM;
 682        struct smbd_request *request;
 683        struct smbd_negotiate_req *packet;
 684
 685        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 686        if (!request)
 687                return rc;
 688
 689        request->info = info;
 690
 691        packet = smbd_request_payload(request);
 692        packet->min_version = cpu_to_le16(SMBD_V1);
 693        packet->max_version = cpu_to_le16(SMBD_V1);
 694        packet->reserved = 0;
 695        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 696        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 697        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 698        packet->max_fragmented_size =
 699                cpu_to_le32(info->max_fragmented_recv_size);
 700
 701        request->num_sge = 1;
 702        request->sge[0].addr = ib_dma_map_single(
 703                                info->id->device, (void *)packet,
 704                                sizeof(*packet), DMA_TO_DEVICE);
 705        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 706                rc = -EIO;
 707                goto dma_mapping_failed;
 708        }
 709
 710        request->sge[0].length = sizeof(*packet);
 711        request->sge[0].lkey = info->pd->local_dma_lkey;
 712
 713        ib_dma_sync_single_for_device(
 714                info->id->device, request->sge[0].addr,
 715                request->sge[0].length, DMA_TO_DEVICE);
 716
 717        request->cqe.done = send_done;
 718
 719        send_wr.next = NULL;
 720        send_wr.wr_cqe = &request->cqe;
 721        send_wr.sg_list = request->sge;
 722        send_wr.num_sge = request->num_sge;
 723        send_wr.opcode = IB_WR_SEND;
 724        send_wr.send_flags = IB_SEND_SIGNALED;
 725
 726        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 727                request->sge[0].addr,
 728                request->sge[0].length, request->sge[0].lkey);
 729
 730        atomic_inc(&info->send_pending);
 731        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 732        if (!rc)
 733                return 0;
 734
 735        /* if we reach here, post send failed */
 736        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 737        atomic_dec(&info->send_pending);
 738        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 739                request->sge[0].length, DMA_TO_DEVICE);
 740
 741        smbd_disconnect_rdma_connection(info);
 742
 743dma_mapping_failed:
 744        mempool_free(request, info->request_mempool);
 745        return rc;
 746}
 747
 748/*
 749 * Extend the credits to remote peer
 750 * This implements [MS-SMBD] 3.1.5.9
 751 * The idea is that we should extend credits to remote peer as quickly as
 752 * it's allowed, to maintain data flow. We allocate as much receive
 753 * buffer as possible, and extend the receive credits to remote peer
 754 * return value: the new credtis being granted.
 755 */
 756static int manage_credits_prior_sending(struct smbd_connection *info)
 757{
 758        int new_credits;
 759
 760        spin_lock(&info->lock_new_credits_offered);
 761        new_credits = info->new_credits_offered;
 762        info->new_credits_offered = 0;
 763        spin_unlock(&info->lock_new_credits_offered);
 764
 765        return new_credits;
 766}
 767
 768/*
 769 * Check if we need to send a KEEP_ALIVE message
 770 * The idle connection timer triggers a KEEP_ALIVE message when expires
 771 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 772 * back a response.
 773 * return value:
 774 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 775 * 0: otherwise
 776 */
 777static int manage_keep_alive_before_sending(struct smbd_connection *info)
 778{
 779        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 780                info->keep_alive_requested = KEEP_ALIVE_SENT;
 781                return 1;
 782        }
 783        return 0;
 784}
 785
 786/* Post the send request */
 787static int smbd_post_send(struct smbd_connection *info,
 788                struct smbd_request *request)
 789{
 790        struct ib_send_wr send_wr;
 791        int rc, i;
 792
 793        for (i = 0; i < request->num_sge; i++) {
 794                log_rdma_send(INFO,
 795                        "rdma_request sge[%d] addr=%llu length=%u\n",
 796                        i, request->sge[i].addr, request->sge[i].length);
 797                ib_dma_sync_single_for_device(
 798                        info->id->device,
 799                        request->sge[i].addr,
 800                        request->sge[i].length,
 801                        DMA_TO_DEVICE);
 802        }
 803
 804        request->cqe.done = send_done;
 805
 806        send_wr.next = NULL;
 807        send_wr.wr_cqe = &request->cqe;
 808        send_wr.sg_list = request->sge;
 809        send_wr.num_sge = request->num_sge;
 810        send_wr.opcode = IB_WR_SEND;
 811        send_wr.send_flags = IB_SEND_SIGNALED;
 812
 813        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 814        if (rc) {
 815                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 816                smbd_disconnect_rdma_connection(info);
 817                rc = -EAGAIN;
 818        } else
 819                /* Reset timer for idle connection after packet is sent */
 820                mod_delayed_work(info->workqueue, &info->idle_timer_work,
 821                        info->keep_alive_interval*HZ);
 822
 823        return rc;
 824}
 825
 826static int smbd_post_send_sgl(struct smbd_connection *info,
 827        struct scatterlist *sgl, int data_length, int remaining_data_length)
 828{
 829        int num_sgs;
 830        int i, rc;
 831        int header_length;
 832        struct smbd_request *request;
 833        struct smbd_data_transfer *packet;
 834        int new_credits;
 835        struct scatterlist *sg;
 836
 837wait_credit:
 838        /* Wait for send credits. A SMBD packet needs one credit */
 839        rc = wait_event_interruptible(info->wait_send_queue,
 840                atomic_read(&info->send_credits) > 0 ||
 841                info->transport_status != SMBD_CONNECTED);
 842        if (rc)
 843                goto err_wait_credit;
 844
 845        if (info->transport_status != SMBD_CONNECTED) {
 846                log_outgoing(ERR, "disconnected not sending on wait_credit\n");
 847                rc = -EAGAIN;
 848                goto err_wait_credit;
 849        }
 850        if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
 851                atomic_inc(&info->send_credits);
 852                goto wait_credit;
 853        }
 854
 855wait_send_queue:
 856        wait_event(info->wait_post_send,
 857                atomic_read(&info->send_pending) < info->send_credit_target ||
 858                info->transport_status != SMBD_CONNECTED);
 859
 860        if (info->transport_status != SMBD_CONNECTED) {
 861                log_outgoing(ERR, "disconnected not sending on wait_send_queue\n");
 862                rc = -EAGAIN;
 863                goto err_wait_send_queue;
 864        }
 865
 866        if (unlikely(atomic_inc_return(&info->send_pending) >
 867                                info->send_credit_target)) {
 868                atomic_dec(&info->send_pending);
 869                goto wait_send_queue;
 870        }
 871
 872        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 873        if (!request) {
 874                rc = -ENOMEM;
 875                goto err_alloc;
 876        }
 877
 878        request->info = info;
 879
 880        /* Fill in the packet header */
 881        packet = smbd_request_payload(request);
 882        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 883
 884        new_credits = manage_credits_prior_sending(info);
 885        atomic_add(new_credits, &info->receive_credits);
 886        packet->credits_granted = cpu_to_le16(new_credits);
 887
 888        info->send_immediate = false;
 889
 890        packet->flags = 0;
 891        if (manage_keep_alive_before_sending(info))
 892                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 893
 894        packet->reserved = 0;
 895        if (!data_length)
 896                packet->data_offset = 0;
 897        else
 898                packet->data_offset = cpu_to_le32(24);
 899        packet->data_length = cpu_to_le32(data_length);
 900        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 901        packet->padding = 0;
 902
 903        log_outgoing(INFO, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
 904                     le16_to_cpu(packet->credits_requested),
 905                     le16_to_cpu(packet->credits_granted),
 906                     le32_to_cpu(packet->data_offset),
 907                     le32_to_cpu(packet->data_length),
 908                     le32_to_cpu(packet->remaining_data_length));
 909
 910        /* Map the packet to DMA */
 911        header_length = sizeof(struct smbd_data_transfer);
 912        /* If this is a packet without payload, don't send padding */
 913        if (!data_length)
 914                header_length = offsetof(struct smbd_data_transfer, padding);
 915
 916        request->num_sge = 1;
 917        request->sge[0].addr = ib_dma_map_single(info->id->device,
 918                                                 (void *)packet,
 919                                                 header_length,
 920                                                 DMA_TO_DEVICE);
 921        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 922                rc = -EIO;
 923                request->sge[0].addr = 0;
 924                goto err_dma;
 925        }
 926
 927        request->sge[0].length = header_length;
 928        request->sge[0].lkey = info->pd->local_dma_lkey;
 929
 930        /* Fill in the packet data payload */
 931        num_sgs = sgl ? sg_nents(sgl) : 0;
 932        for_each_sg(sgl, sg, num_sgs, i) {
 933                request->sge[i+1].addr =
 934                        ib_dma_map_page(info->id->device, sg_page(sg),
 935                               sg->offset, sg->length, DMA_TO_DEVICE);
 936                if (ib_dma_mapping_error(
 937                                info->id->device, request->sge[i+1].addr)) {
 938                        rc = -EIO;
 939                        request->sge[i+1].addr = 0;
 940                        goto err_dma;
 941                }
 942                request->sge[i+1].length = sg->length;
 943                request->sge[i+1].lkey = info->pd->local_dma_lkey;
 944                request->num_sge++;
 945        }
 946
 947        rc = smbd_post_send(info, request);
 948        if (!rc)
 949                return 0;
 950
 951err_dma:
 952        for (i = 0; i < request->num_sge; i++)
 953                if (request->sge[i].addr)
 954                        ib_dma_unmap_single(info->id->device,
 955                                            request->sge[i].addr,
 956                                            request->sge[i].length,
 957                                            DMA_TO_DEVICE);
 958        mempool_free(request, info->request_mempool);
 959
 960        /* roll back receive credits and credits to be offered */
 961        spin_lock(&info->lock_new_credits_offered);
 962        info->new_credits_offered += new_credits;
 963        spin_unlock(&info->lock_new_credits_offered);
 964        atomic_sub(new_credits, &info->receive_credits);
 965
 966err_alloc:
 967        if (atomic_dec_and_test(&info->send_pending))
 968                wake_up(&info->wait_send_pending);
 969
 970err_wait_send_queue:
 971        /* roll back send credits and pending */
 972        atomic_inc(&info->send_credits);
 973
 974err_wait_credit:
 975        return rc;
 976}
 977
 978/*
 979 * Send a page
 980 * page: the page to send
 981 * offset: offset in the page to send
 982 * size: length in the page to send
 983 * remaining_data_length: remaining data to send in this payload
 984 */
 985static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
 986                unsigned long offset, size_t size, int remaining_data_length)
 987{
 988        struct scatterlist sgl;
 989
 990        sg_init_table(&sgl, 1);
 991        sg_set_page(&sgl, page, size, offset);
 992
 993        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
 994}
 995
 996/*
 997 * Send an empty message
 998 * Empty message is used to extend credits to peer to for keep live
 999 * while there is no upper layer payload to send at the time
1000 */
1001static int smbd_post_send_empty(struct smbd_connection *info)
1002{
1003        info->count_send_empty++;
1004        return smbd_post_send_sgl(info, NULL, 0, 0);
1005}
1006
1007/*
1008 * Send a data buffer
1009 * iov: the iov array describing the data buffers
1010 * n_vec: number of iov array
1011 * remaining_data_length: remaining data to send following this packet
1012 * in segmented SMBD packet
1013 */
1014static int smbd_post_send_data(
1015        struct smbd_connection *info, struct kvec *iov, int n_vec,
1016        int remaining_data_length)
1017{
1018        int i;
1019        u32 data_length = 0;
1020        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1021
1022        if (n_vec > SMBDIRECT_MAX_SGE) {
1023                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1024                return -EINVAL;
1025        }
1026
1027        sg_init_table(sgl, n_vec);
1028        for (i = 0; i < n_vec; i++) {
1029                data_length += iov[i].iov_len;
1030                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1031        }
1032
1033        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1034}
1035
1036/*
1037 * Post a receive request to the transport
1038 * The remote peer can only send data when a receive request is posted
1039 * The interaction is controlled by send/receive credit system
1040 */
1041static int smbd_post_recv(
1042                struct smbd_connection *info, struct smbd_response *response)
1043{
1044        struct ib_recv_wr recv_wr;
1045        int rc = -EIO;
1046
1047        response->sge.addr = ib_dma_map_single(
1048                                info->id->device, response->packet,
1049                                info->max_receive_size, DMA_FROM_DEVICE);
1050        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1051                return rc;
1052
1053        response->sge.length = info->max_receive_size;
1054        response->sge.lkey = info->pd->local_dma_lkey;
1055
1056        response->cqe.done = recv_done;
1057
1058        recv_wr.wr_cqe = &response->cqe;
1059        recv_wr.next = NULL;
1060        recv_wr.sg_list = &response->sge;
1061        recv_wr.num_sge = 1;
1062
1063        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1064        if (rc) {
1065                ib_dma_unmap_single(info->id->device, response->sge.addr,
1066                                    response->sge.length, DMA_FROM_DEVICE);
1067                smbd_disconnect_rdma_connection(info);
1068                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1069        }
1070
1071        return rc;
1072}
1073
1074/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1075static int smbd_negotiate(struct smbd_connection *info)
1076{
1077        int rc;
1078        struct smbd_response *response = get_receive_buffer(info);
1079
1080        response->type = SMBD_NEGOTIATE_RESP;
1081        rc = smbd_post_recv(info, response);
1082        log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
1083                       rc, response->sge.addr,
1084                       response->sge.length, response->sge.lkey);
1085        if (rc)
1086                return rc;
1087
1088        init_completion(&info->negotiate_completion);
1089        info->negotiate_done = false;
1090        rc = smbd_post_send_negotiate_req(info);
1091        if (rc)
1092                return rc;
1093
1094        rc = wait_for_completion_interruptible_timeout(
1095                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1096        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1097
1098        if (info->negotiate_done)
1099                return 0;
1100
1101        if (rc == 0)
1102                rc = -ETIMEDOUT;
1103        else if (rc == -ERESTARTSYS)
1104                rc = -EINTR;
1105        else
1106                rc = -ENOTCONN;
1107
1108        return rc;
1109}
1110
1111static void put_empty_packet(
1112                struct smbd_connection *info, struct smbd_response *response)
1113{
1114        spin_lock(&info->empty_packet_queue_lock);
1115        list_add_tail(&response->list, &info->empty_packet_queue);
1116        info->count_empty_packet_queue++;
1117        spin_unlock(&info->empty_packet_queue_lock);
1118
1119        queue_work(info->workqueue, &info->post_send_credits_work);
1120}
1121
1122/*
1123 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1124 * This is a queue for reassembling upper layer payload and present to upper
1125 * layer. All the inncoming payload go to the reassembly queue, regardless of
1126 * if reassembly is required. The uuper layer code reads from the queue for all
1127 * incoming payloads.
1128 * Put a received packet to the reassembly queue
1129 * response: the packet received
1130 * data_length: the size of payload in this packet
1131 */
1132static void enqueue_reassembly(
1133        struct smbd_connection *info,
1134        struct smbd_response *response,
1135        int data_length)
1136{
1137        spin_lock(&info->reassembly_queue_lock);
1138        list_add_tail(&response->list, &info->reassembly_queue);
1139        info->reassembly_queue_length++;
1140        /*
1141         * Make sure reassembly_data_length is updated after list and
1142         * reassembly_queue_length are updated. On the dequeue side
1143         * reassembly_data_length is checked without a lock to determine
1144         * if reassembly_queue_length and list is up to date
1145         */
1146        virt_wmb();
1147        info->reassembly_data_length += data_length;
1148        spin_unlock(&info->reassembly_queue_lock);
1149        info->count_reassembly_queue++;
1150        info->count_enqueue_reassembly_queue++;
1151}
1152
1153/*
1154 * Get the first entry at the front of reassembly queue
1155 * Caller is responsible for locking
1156 * return value: the first entry if any, NULL if queue is empty
1157 */
1158static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1159{
1160        struct smbd_response *ret = NULL;
1161
1162        if (!list_empty(&info->reassembly_queue)) {
1163                ret = list_first_entry(
1164                        &info->reassembly_queue,
1165                        struct smbd_response, list);
1166        }
1167        return ret;
1168}
1169
1170static struct smbd_response *get_empty_queue_buffer(
1171                struct smbd_connection *info)
1172{
1173        struct smbd_response *ret = NULL;
1174        unsigned long flags;
1175
1176        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1177        if (!list_empty(&info->empty_packet_queue)) {
1178                ret = list_first_entry(
1179                        &info->empty_packet_queue,
1180                        struct smbd_response, list);
1181                list_del(&ret->list);
1182                info->count_empty_packet_queue--;
1183        }
1184        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1185
1186        return ret;
1187}
1188
1189/*
1190 * Get a receive buffer
1191 * For each remote send, we need to post a receive. The receive buffers are
1192 * pre-allocated in advance.
1193 * return value: the receive buffer, NULL if none is available
1194 */
1195static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1196{
1197        struct smbd_response *ret = NULL;
1198        unsigned long flags;
1199
1200        spin_lock_irqsave(&info->receive_queue_lock, flags);
1201        if (!list_empty(&info->receive_queue)) {
1202                ret = list_first_entry(
1203                        &info->receive_queue,
1204                        struct smbd_response, list);
1205                list_del(&ret->list);
1206                info->count_receive_queue--;
1207                info->count_get_receive_buffer++;
1208        }
1209        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1210
1211        return ret;
1212}
1213
1214/*
1215 * Return a receive buffer
1216 * Upon returning of a receive buffer, we can post new receive and extend
1217 * more receive credits to remote peer. This is done immediately after a
1218 * receive buffer is returned.
1219 */
1220static void put_receive_buffer(
1221        struct smbd_connection *info, struct smbd_response *response)
1222{
1223        unsigned long flags;
1224
1225        ib_dma_unmap_single(info->id->device, response->sge.addr,
1226                response->sge.length, DMA_FROM_DEVICE);
1227
1228        spin_lock_irqsave(&info->receive_queue_lock, flags);
1229        list_add_tail(&response->list, &info->receive_queue);
1230        info->count_receive_queue++;
1231        info->count_put_receive_buffer++;
1232        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1233
1234        queue_work(info->workqueue, &info->post_send_credits_work);
1235}
1236
1237/* Preallocate all receive buffer on transport establishment */
1238static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1239{
1240        int i;
1241        struct smbd_response *response;
1242
1243        INIT_LIST_HEAD(&info->reassembly_queue);
1244        spin_lock_init(&info->reassembly_queue_lock);
1245        info->reassembly_data_length = 0;
1246        info->reassembly_queue_length = 0;
1247
1248        INIT_LIST_HEAD(&info->receive_queue);
1249        spin_lock_init(&info->receive_queue_lock);
1250        info->count_receive_queue = 0;
1251
1252        INIT_LIST_HEAD(&info->empty_packet_queue);
1253        spin_lock_init(&info->empty_packet_queue_lock);
1254        info->count_empty_packet_queue = 0;
1255
1256        init_waitqueue_head(&info->wait_receive_queues);
1257
1258        for (i = 0; i < num_buf; i++) {
1259                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1260                if (!response)
1261                        goto allocate_failed;
1262
1263                response->info = info;
1264                list_add_tail(&response->list, &info->receive_queue);
1265                info->count_receive_queue++;
1266        }
1267
1268        return 0;
1269
1270allocate_failed:
1271        while (!list_empty(&info->receive_queue)) {
1272                response = list_first_entry(
1273                                &info->receive_queue,
1274                                struct smbd_response, list);
1275                list_del(&response->list);
1276                info->count_receive_queue--;
1277
1278                mempool_free(response, info->response_mempool);
1279        }
1280        return -ENOMEM;
1281}
1282
1283static void destroy_receive_buffers(struct smbd_connection *info)
1284{
1285        struct smbd_response *response;
1286
1287        while ((response = get_receive_buffer(info)))
1288                mempool_free(response, info->response_mempool);
1289
1290        while ((response = get_empty_queue_buffer(info)))
1291                mempool_free(response, info->response_mempool);
1292}
1293
1294/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1295static void idle_connection_timer(struct work_struct *work)
1296{
1297        struct smbd_connection *info = container_of(
1298                                        work, struct smbd_connection,
1299                                        idle_timer_work.work);
1300
1301        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1302                log_keep_alive(ERR,
1303                        "error status info->keep_alive_requested=%d\n",
1304                        info->keep_alive_requested);
1305                smbd_disconnect_rdma_connection(info);
1306                return;
1307        }
1308
1309        log_keep_alive(INFO, "about to send an empty idle message\n");
1310        smbd_post_send_empty(info);
1311
1312        /* Setup the next idle timeout work */
1313        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1314                        info->keep_alive_interval*HZ);
1315}
1316
1317/*
1318 * Destroy the transport and related RDMA and memory resources
1319 * Need to go through all the pending counters and make sure on one is using
1320 * the transport while it is destroyed
1321 */
1322void smbd_destroy(struct TCP_Server_Info *server)
1323{
1324        struct smbd_connection *info = server->smbd_conn;
1325        struct smbd_response *response;
1326        unsigned long flags;
1327
1328        if (!info) {
1329                log_rdma_event(INFO, "rdma session already destroyed\n");
1330                return;
1331        }
1332
1333        log_rdma_event(INFO, "destroying rdma session\n");
1334        if (info->transport_status != SMBD_DISCONNECTED) {
1335                rdma_disconnect(server->smbd_conn->id);
1336                log_rdma_event(INFO, "wait for transport being disconnected\n");
1337                wait_event_interruptible(
1338                        info->disconn_wait,
1339                        info->transport_status == SMBD_DISCONNECTED);
1340        }
1341
1342        log_rdma_event(INFO, "destroying qp\n");
1343        ib_drain_qp(info->id->qp);
1344        rdma_destroy_qp(info->id);
1345
1346        log_rdma_event(INFO, "cancelling idle timer\n");
1347        cancel_delayed_work_sync(&info->idle_timer_work);
1348
1349        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1350        wait_event(info->wait_send_pending,
1351                atomic_read(&info->send_pending) == 0);
1352
1353        /* It's not posssible for upper layer to get to reassembly */
1354        log_rdma_event(INFO, "drain the reassembly queue\n");
1355        do {
1356                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1357                response = _get_first_reassembly(info);
1358                if (response) {
1359                        list_del(&response->list);
1360                        spin_unlock_irqrestore(
1361                                &info->reassembly_queue_lock, flags);
1362                        put_receive_buffer(info, response);
1363                } else
1364                        spin_unlock_irqrestore(
1365                                &info->reassembly_queue_lock, flags);
1366        } while (response);
1367        info->reassembly_data_length = 0;
1368
1369        log_rdma_event(INFO, "free receive buffers\n");
1370        wait_event(info->wait_receive_queues,
1371                info->count_receive_queue + info->count_empty_packet_queue
1372                        == info->receive_credit_max);
1373        destroy_receive_buffers(info);
1374
1375        /*
1376         * For performance reasons, memory registration and deregistration
1377         * are not locked by srv_mutex. It is possible some processes are
1378         * blocked on transport srv_mutex while holding memory registration.
1379         * Release the transport srv_mutex to allow them to hit the failure
1380         * path when sending data, and then release memory registartions.
1381         */
1382        log_rdma_event(INFO, "freeing mr list\n");
1383        wake_up_interruptible_all(&info->wait_mr);
1384        while (atomic_read(&info->mr_used_count)) {
1385                mutex_unlock(&server->srv_mutex);
1386                msleep(1000);
1387                mutex_lock(&server->srv_mutex);
1388        }
1389        destroy_mr_list(info);
1390
1391        ib_free_cq(info->send_cq);
1392        ib_free_cq(info->recv_cq);
1393        ib_dealloc_pd(info->pd);
1394        rdma_destroy_id(info->id);
1395
1396        /* free mempools */
1397        mempool_destroy(info->request_mempool);
1398        kmem_cache_destroy(info->request_cache);
1399
1400        mempool_destroy(info->response_mempool);
1401        kmem_cache_destroy(info->response_cache);
1402
1403        info->transport_status = SMBD_DESTROYED;
1404
1405        destroy_workqueue(info->workqueue);
1406        log_rdma_event(INFO,  "rdma session destroyed\n");
1407        kfree(info);
1408}
1409
1410/*
1411 * Reconnect this SMBD connection, called from upper layer
1412 * return value: 0 on success, or actual error code
1413 */
1414int smbd_reconnect(struct TCP_Server_Info *server)
1415{
1416        log_rdma_event(INFO, "reconnecting rdma session\n");
1417
1418        if (!server->smbd_conn) {
1419                log_rdma_event(INFO, "rdma session already destroyed\n");
1420                goto create_conn;
1421        }
1422
1423        /*
1424         * This is possible if transport is disconnected and we haven't received
1425         * notification from RDMA, but upper layer has detected timeout
1426         */
1427        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1428                log_rdma_event(INFO, "disconnecting transport\n");
1429                smbd_destroy(server);
1430        }
1431
1432create_conn:
1433        log_rdma_event(INFO, "creating rdma session\n");
1434        server->smbd_conn = smbd_get_connection(
1435                server, (struct sockaddr *) &server->dstaddr);
1436
1437        if (server->smbd_conn)
1438                cifs_dbg(VFS, "RDMA transport re-established\n");
1439
1440        return server->smbd_conn ? 0 : -ENOENT;
1441}
1442
1443static void destroy_caches_and_workqueue(struct smbd_connection *info)
1444{
1445        destroy_receive_buffers(info);
1446        destroy_workqueue(info->workqueue);
1447        mempool_destroy(info->response_mempool);
1448        kmem_cache_destroy(info->response_cache);
1449        mempool_destroy(info->request_mempool);
1450        kmem_cache_destroy(info->request_cache);
1451}
1452
1453#define MAX_NAME_LEN    80
1454static int allocate_caches_and_workqueue(struct smbd_connection *info)
1455{
1456        char name[MAX_NAME_LEN];
1457        int rc;
1458
1459        scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1460        info->request_cache =
1461                kmem_cache_create(
1462                        name,
1463                        sizeof(struct smbd_request) +
1464                                sizeof(struct smbd_data_transfer),
1465                        0, SLAB_HWCACHE_ALIGN, NULL);
1466        if (!info->request_cache)
1467                return -ENOMEM;
1468
1469        info->request_mempool =
1470                mempool_create(info->send_credit_target, mempool_alloc_slab,
1471                        mempool_free_slab, info->request_cache);
1472        if (!info->request_mempool)
1473                goto out1;
1474
1475        scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1476        info->response_cache =
1477                kmem_cache_create(
1478                        name,
1479                        sizeof(struct smbd_response) +
1480                                info->max_receive_size,
1481                        0, SLAB_HWCACHE_ALIGN, NULL);
1482        if (!info->response_cache)
1483                goto out2;
1484
1485        info->response_mempool =
1486                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1487                       mempool_free_slab, info->response_cache);
1488        if (!info->response_mempool)
1489                goto out3;
1490
1491        scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1492        info->workqueue = create_workqueue(name);
1493        if (!info->workqueue)
1494                goto out4;
1495
1496        rc = allocate_receive_buffers(info, info->receive_credit_max);
1497        if (rc) {
1498                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1499                goto out5;
1500        }
1501
1502        return 0;
1503
1504out5:
1505        destroy_workqueue(info->workqueue);
1506out4:
1507        mempool_destroy(info->response_mempool);
1508out3:
1509        kmem_cache_destroy(info->response_cache);
1510out2:
1511        mempool_destroy(info->request_mempool);
1512out1:
1513        kmem_cache_destroy(info->request_cache);
1514        return -ENOMEM;
1515}
1516
1517/* Create a SMBD connection, called by upper layer */
1518static struct smbd_connection *_smbd_get_connection(
1519        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1520{
1521        int rc;
1522        struct smbd_connection *info;
1523        struct rdma_conn_param conn_param;
1524        struct ib_qp_init_attr qp_attr;
1525        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1526        struct ib_port_immutable port_immutable;
1527        u32 ird_ord_hdr[2];
1528
1529        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1530        if (!info)
1531                return NULL;
1532
1533        info->transport_status = SMBD_CONNECTING;
1534        rc = smbd_ia_open(info, dstaddr, port);
1535        if (rc) {
1536                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1537                goto create_id_failed;
1538        }
1539
1540        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1541            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1542                log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1543                               smbd_send_credit_target,
1544                               info->id->device->attrs.max_cqe,
1545                               info->id->device->attrs.max_qp_wr);
1546                goto config_failed;
1547        }
1548
1549        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1550            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1551                log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1552                               smbd_receive_credit_max,
1553                               info->id->device->attrs.max_cqe,
1554                               info->id->device->attrs.max_qp_wr);
1555                goto config_failed;
1556        }
1557
1558        info->receive_credit_max = smbd_receive_credit_max;
1559        info->send_credit_target = smbd_send_credit_target;
1560        info->max_send_size = smbd_max_send_size;
1561        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1562        info->max_receive_size = smbd_max_receive_size;
1563        info->keep_alive_interval = smbd_keep_alive_interval;
1564
1565        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1566                log_rdma_event(ERR,
1567                        "warning: device max_send_sge = %d too small\n",
1568                        info->id->device->attrs.max_send_sge);
1569                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1570        }
1571        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1572                log_rdma_event(ERR,
1573                        "warning: device max_recv_sge = %d too small\n",
1574                        info->id->device->attrs.max_recv_sge);
1575                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1576        }
1577
1578        info->send_cq = NULL;
1579        info->recv_cq = NULL;
1580        info->send_cq =
1581                ib_alloc_cq_any(info->id->device, info,
1582                                info->send_credit_target, IB_POLL_SOFTIRQ);
1583        if (IS_ERR(info->send_cq)) {
1584                info->send_cq = NULL;
1585                goto alloc_cq_failed;
1586        }
1587
1588        info->recv_cq =
1589                ib_alloc_cq_any(info->id->device, info,
1590                                info->receive_credit_max, IB_POLL_SOFTIRQ);
1591        if (IS_ERR(info->recv_cq)) {
1592                info->recv_cq = NULL;
1593                goto alloc_cq_failed;
1594        }
1595
1596        memset(&qp_attr, 0, sizeof(qp_attr));
1597        qp_attr.event_handler = smbd_qp_async_error_upcall;
1598        qp_attr.qp_context = info;
1599        qp_attr.cap.max_send_wr = info->send_credit_target;
1600        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1601        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1602        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1603        qp_attr.cap.max_inline_data = 0;
1604        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1605        qp_attr.qp_type = IB_QPT_RC;
1606        qp_attr.send_cq = info->send_cq;
1607        qp_attr.recv_cq = info->recv_cq;
1608        qp_attr.port_num = ~0;
1609
1610        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1611        if (rc) {
1612                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1613                goto create_qp_failed;
1614        }
1615
1616        memset(&conn_param, 0, sizeof(conn_param));
1617        conn_param.initiator_depth = 0;
1618
1619        conn_param.responder_resources =
1620                info->id->device->attrs.max_qp_rd_atom
1621                        < SMBD_CM_RESPONDER_RESOURCES ?
1622                info->id->device->attrs.max_qp_rd_atom :
1623                SMBD_CM_RESPONDER_RESOURCES;
1624        info->responder_resources = conn_param.responder_resources;
1625        log_rdma_mr(INFO, "responder_resources=%d\n",
1626                info->responder_resources);
1627
1628        /* Need to send IRD/ORD in private data for iWARP */
1629        info->id->device->ops.get_port_immutable(
1630                info->id->device, info->id->port_num, &port_immutable);
1631        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1632                ird_ord_hdr[0] = info->responder_resources;
1633                ird_ord_hdr[1] = 1;
1634                conn_param.private_data = ird_ord_hdr;
1635                conn_param.private_data_len = sizeof(ird_ord_hdr);
1636        } else {
1637                conn_param.private_data = NULL;
1638                conn_param.private_data_len = 0;
1639        }
1640
1641        conn_param.retry_count = SMBD_CM_RETRY;
1642        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1643        conn_param.flow_control = 0;
1644
1645        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1646                &addr_in->sin_addr, port);
1647
1648        init_waitqueue_head(&info->conn_wait);
1649        init_waitqueue_head(&info->disconn_wait);
1650        init_waitqueue_head(&info->wait_reassembly_queue);
1651        rc = rdma_connect(info->id, &conn_param);
1652        if (rc) {
1653                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1654                goto rdma_connect_failed;
1655        }
1656
1657        wait_event_interruptible(
1658                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1659
1660        if (info->transport_status != SMBD_CONNECTED) {
1661                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1662                goto rdma_connect_failed;
1663        }
1664
1665        log_rdma_event(INFO, "rdma_connect connected\n");
1666
1667        rc = allocate_caches_and_workqueue(info);
1668        if (rc) {
1669                log_rdma_event(ERR, "cache allocation failed\n");
1670                goto allocate_cache_failed;
1671        }
1672
1673        init_waitqueue_head(&info->wait_send_queue);
1674        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1675        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1676                info->keep_alive_interval*HZ);
1677
1678        init_waitqueue_head(&info->wait_send_pending);
1679        atomic_set(&info->send_pending, 0);
1680
1681        init_waitqueue_head(&info->wait_post_send);
1682
1683        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1684        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1685        info->new_credits_offered = 0;
1686        spin_lock_init(&info->lock_new_credits_offered);
1687
1688        rc = smbd_negotiate(info);
1689        if (rc) {
1690                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1691                goto negotiation_failed;
1692        }
1693
1694        rc = allocate_mr_list(info);
1695        if (rc) {
1696                log_rdma_mr(ERR, "memory registration allocation failed\n");
1697                goto allocate_mr_failed;
1698        }
1699
1700        return info;
1701
1702allocate_mr_failed:
1703        /* At this point, need to a full transport shutdown */
1704        smbd_destroy(server);
1705        return NULL;
1706
1707negotiation_failed:
1708        cancel_delayed_work_sync(&info->idle_timer_work);
1709        destroy_caches_and_workqueue(info);
1710        info->transport_status = SMBD_NEGOTIATE_FAILED;
1711        init_waitqueue_head(&info->conn_wait);
1712        rdma_disconnect(info->id);
1713        wait_event(info->conn_wait,
1714                info->transport_status == SMBD_DISCONNECTED);
1715
1716allocate_cache_failed:
1717rdma_connect_failed:
1718        rdma_destroy_qp(info->id);
1719
1720create_qp_failed:
1721alloc_cq_failed:
1722        if (info->send_cq)
1723                ib_free_cq(info->send_cq);
1724        if (info->recv_cq)
1725                ib_free_cq(info->recv_cq);
1726
1727config_failed:
1728        ib_dealloc_pd(info->pd);
1729        rdma_destroy_id(info->id);
1730
1731create_id_failed:
1732        kfree(info);
1733        return NULL;
1734}
1735
1736struct smbd_connection *smbd_get_connection(
1737        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1738{
1739        struct smbd_connection *ret;
1740        int port = SMBD_PORT;
1741
1742try_again:
1743        ret = _smbd_get_connection(server, dstaddr, port);
1744
1745        /* Try SMB_PORT if SMBD_PORT doesn't work */
1746        if (!ret && port == SMBD_PORT) {
1747                port = SMB_PORT;
1748                goto try_again;
1749        }
1750        return ret;
1751}
1752
1753/*
1754 * Receive data from receive reassembly queue
1755 * All the incoming data packets are placed in reassembly queue
1756 * buf: the buffer to read data into
1757 * size: the length of data to read
1758 * return value: actual data read
1759 * Note: this implementation copies the data from reassebmly queue to receive
1760 * buffers used by upper layer. This is not the optimal code path. A better way
1761 * to do it is to not have upper layer allocate its receive buffers but rather
1762 * borrow the buffer from reassembly queue, and return it after data is
1763 * consumed. But this will require more changes to upper layer code, and also
1764 * need to consider packet boundaries while they still being reassembled.
1765 */
1766static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1767                unsigned int size)
1768{
1769        struct smbd_response *response;
1770        struct smbd_data_transfer *data_transfer;
1771        int to_copy, to_read, data_read, offset;
1772        u32 data_length, remaining_data_length, data_offset;
1773        int rc;
1774
1775again:
1776        /*
1777         * No need to hold the reassembly queue lock all the time as we are
1778         * the only one reading from the front of the queue. The transport
1779         * may add more entries to the back of the queue at the same time
1780         */
1781        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1782                info->reassembly_data_length);
1783        if (info->reassembly_data_length >= size) {
1784                int queue_length;
1785                int queue_removed = 0;
1786
1787                /*
1788                 * Need to make sure reassembly_data_length is read before
1789                 * reading reassembly_queue_length and calling
1790                 * _get_first_reassembly. This call is lock free
1791                 * as we never read at the end of the queue which are being
1792                 * updated in SOFTIRQ as more data is received
1793                 */
1794                virt_rmb();
1795                queue_length = info->reassembly_queue_length;
1796                data_read = 0;
1797                to_read = size;
1798                offset = info->first_entry_offset;
1799                while (data_read < size) {
1800                        response = _get_first_reassembly(info);
1801                        data_transfer = smbd_response_payload(response);
1802                        data_length = le32_to_cpu(data_transfer->data_length);
1803                        remaining_data_length =
1804                                le32_to_cpu(
1805                                        data_transfer->remaining_data_length);
1806                        data_offset = le32_to_cpu(data_transfer->data_offset);
1807
1808                        /*
1809                         * The upper layer expects RFC1002 length at the
1810                         * beginning of the payload. Return it to indicate
1811                         * the total length of the packet. This minimize the
1812                         * change to upper layer packet processing logic. This
1813                         * will be eventually remove when an intermediate
1814                         * transport layer is added
1815                         */
1816                        if (response->first_segment && size == 4) {
1817                                unsigned int rfc1002_len =
1818                                        data_length + remaining_data_length;
1819                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1820                                data_read = 4;
1821                                response->first_segment = false;
1822                                log_read(INFO, "returning rfc1002 length %d\n",
1823                                        rfc1002_len);
1824                                goto read_rfc1002_done;
1825                        }
1826
1827                        to_copy = min_t(int, data_length - offset, to_read);
1828                        memcpy(
1829                                buf + data_read,
1830                                (char *)data_transfer + data_offset + offset,
1831                                to_copy);
1832
1833                        /* move on to the next buffer? */
1834                        if (to_copy == data_length - offset) {
1835                                queue_length--;
1836                                /*
1837                                 * No need to lock if we are not at the
1838                                 * end of the queue
1839                                 */
1840                                if (queue_length)
1841                                        list_del(&response->list);
1842                                else {
1843                                        spin_lock_irq(
1844                                                &info->reassembly_queue_lock);
1845                                        list_del(&response->list);
1846                                        spin_unlock_irq(
1847                                                &info->reassembly_queue_lock);
1848                                }
1849                                queue_removed++;
1850                                info->count_reassembly_queue--;
1851                                info->count_dequeue_reassembly_queue++;
1852                                put_receive_buffer(info, response);
1853                                offset = 0;
1854                                log_read(INFO, "put_receive_buffer offset=0\n");
1855                        } else
1856                                offset += to_copy;
1857
1858                        to_read -= to_copy;
1859                        data_read += to_copy;
1860
1861                        log_read(INFO, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
1862                                 to_copy, data_length - offset,
1863                                 to_read, data_read, offset);
1864                }
1865
1866                spin_lock_irq(&info->reassembly_queue_lock);
1867                info->reassembly_data_length -= data_read;
1868                info->reassembly_queue_length -= queue_removed;
1869                spin_unlock_irq(&info->reassembly_queue_lock);
1870
1871                info->first_entry_offset = offset;
1872                log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
1873                         data_read, info->reassembly_data_length,
1874                         info->first_entry_offset);
1875read_rfc1002_done:
1876                return data_read;
1877        }
1878
1879        log_read(INFO, "wait_event on more data\n");
1880        rc = wait_event_interruptible(
1881                info->wait_reassembly_queue,
1882                info->reassembly_data_length >= size ||
1883                        info->transport_status != SMBD_CONNECTED);
1884        /* Don't return any data if interrupted */
1885        if (rc)
1886                return rc;
1887
1888        if (info->transport_status != SMBD_CONNECTED) {
1889                log_read(ERR, "disconnected\n");
1890                return -ECONNABORTED;
1891        }
1892
1893        goto again;
1894}
1895
1896/*
1897 * Receive a page from receive reassembly queue
1898 * page: the page to read data into
1899 * to_read: the length of data to read
1900 * return value: actual data read
1901 */
1902static int smbd_recv_page(struct smbd_connection *info,
1903                struct page *page, unsigned int page_offset,
1904                unsigned int to_read)
1905{
1906        int ret;
1907        char *to_address;
1908        void *page_address;
1909
1910        /* make sure we have the page ready for read */
1911        ret = wait_event_interruptible(
1912                info->wait_reassembly_queue,
1913                info->reassembly_data_length >= to_read ||
1914                        info->transport_status != SMBD_CONNECTED);
1915        if (ret)
1916                return ret;
1917
1918        /* now we can read from reassembly queue and not sleep */
1919        page_address = kmap_atomic(page);
1920        to_address = (char *) page_address + page_offset;
1921
1922        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
1923                page, to_address, to_read);
1924
1925        ret = smbd_recv_buf(info, to_address, to_read);
1926        kunmap_atomic(page_address);
1927
1928        return ret;
1929}
1930
1931/*
1932 * Receive data from transport
1933 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1934 * return: total bytes read, or 0. SMB Direct will not do partial read.
1935 */
1936int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
1937{
1938        char *buf;
1939        struct page *page;
1940        unsigned int to_read, page_offset;
1941        int rc;
1942
1943        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
1944                /* It's a bug in upper layer to get there */
1945                cifs_dbg(VFS, "Invalid msg iter dir %u\n",
1946                         iov_iter_rw(&msg->msg_iter));
1947                rc = -EINVAL;
1948                goto out;
1949        }
1950
1951        switch (iov_iter_type(&msg->msg_iter)) {
1952        case ITER_KVEC:
1953                buf = msg->msg_iter.kvec->iov_base;
1954                to_read = msg->msg_iter.kvec->iov_len;
1955                rc = smbd_recv_buf(info, buf, to_read);
1956                break;
1957
1958        case ITER_BVEC:
1959                page = msg->msg_iter.bvec->bv_page;
1960                page_offset = msg->msg_iter.bvec->bv_offset;
1961                to_read = msg->msg_iter.bvec->bv_len;
1962                rc = smbd_recv_page(info, page, page_offset, to_read);
1963                break;
1964
1965        default:
1966                /* It's a bug in upper layer to get there */
1967                cifs_dbg(VFS, "Invalid msg type %d\n",
1968                         iov_iter_type(&msg->msg_iter));
1969                rc = -EINVAL;
1970        }
1971
1972out:
1973        /* SMBDirect will read it all or nothing */
1974        if (rc > 0)
1975                msg->msg_iter.count = 0;
1976        return rc;
1977}
1978
1979/*
1980 * Send data to transport
1981 * Each rqst is transported as a SMBDirect payload
1982 * rqst: the data to write
1983 * return value: 0 if successfully write, otherwise error code
1984 */
1985int smbd_send(struct TCP_Server_Info *server,
1986        int num_rqst, struct smb_rqst *rqst_array)
1987{
1988        struct smbd_connection *info = server->smbd_conn;
1989        struct kvec vec;
1990        int nvecs;
1991        int size;
1992        unsigned int buflen, remaining_data_length;
1993        int start, i, j;
1994        int max_iov_size =
1995                info->max_send_size - sizeof(struct smbd_data_transfer);
1996        struct kvec *iov;
1997        int rc;
1998        struct smb_rqst *rqst;
1999        int rqst_idx;
2000
2001        if (info->transport_status != SMBD_CONNECTED) {
2002                rc = -EAGAIN;
2003                goto done;
2004        }
2005
2006        /*
2007         * Add in the page array if there is one. The caller needs to set
2008         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
2009         * ends at page boundary
2010         */
2011        remaining_data_length = 0;
2012        for (i = 0; i < num_rqst; i++)
2013                remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2014
2015        if (remaining_data_length > info->max_fragmented_send_size) {
2016                log_write(ERR, "payload size %d > max size %d\n",
2017                        remaining_data_length, info->max_fragmented_send_size);
2018                rc = -EINVAL;
2019                goto done;
2020        }
2021
2022        log_write(INFO, "num_rqst=%d total length=%u\n",
2023                        num_rqst, remaining_data_length);
2024
2025        rqst_idx = 0;
2026next_rqst:
2027        rqst = &rqst_array[rqst_idx];
2028        iov = rqst->rq_iov;
2029
2030        cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2031                rqst_idx, smb_rqst_len(server, rqst));
2032        for (i = 0; i < rqst->rq_nvec; i++)
2033                dump_smb(iov[i].iov_base, iov[i].iov_len);
2034
2035
2036        log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
2037                  rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2038                  rqst->rq_tailsz, smb_rqst_len(server, rqst));
2039
2040        start = i = 0;
2041        buflen = 0;
2042        while (true) {
2043                buflen += iov[i].iov_len;
2044                if (buflen > max_iov_size) {
2045                        if (i > start) {
2046                                remaining_data_length -=
2047                                        (buflen-iov[i].iov_len);
2048                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2049                                          start, i, i - start,
2050                                          remaining_data_length);
2051                                rc = smbd_post_send_data(
2052                                        info, &iov[start], i-start,
2053                                        remaining_data_length);
2054                                if (rc)
2055                                        goto done;
2056                        } else {
2057                                /* iov[start] is too big, break it */
2058                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2059                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
2060                                          start, iov[start].iov_base,
2061                                          buflen, nvecs);
2062                                for (j = 0; j < nvecs; j++) {
2063                                        vec.iov_base =
2064                                                (char *)iov[start].iov_base +
2065                                                j*max_iov_size;
2066                                        vec.iov_len = max_iov_size;
2067                                        if (j == nvecs-1)
2068                                                vec.iov_len =
2069                                                        buflen -
2070                                                        max_iov_size*(nvecs-1);
2071                                        remaining_data_length -= vec.iov_len;
2072                                        log_write(INFO,
2073                                                "sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
2074                                                  j, vec.iov_base, vec.iov_len,
2075                                                  remaining_data_length);
2076                                        rc = smbd_post_send_data(
2077                                                info, &vec, 1,
2078                                                remaining_data_length);
2079                                        if (rc)
2080                                                goto done;
2081                                }
2082                                i++;
2083                                if (i == rqst->rq_nvec)
2084                                        break;
2085                        }
2086                        start = i;
2087                        buflen = 0;
2088                } else {
2089                        i++;
2090                        if (i == rqst->rq_nvec) {
2091                                /* send out all remaining vecs */
2092                                remaining_data_length -= buflen;
2093                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2094                                          start, i, i - start,
2095                                          remaining_data_length);
2096                                rc = smbd_post_send_data(info, &iov[start],
2097                                        i-start, remaining_data_length);
2098                                if (rc)
2099                                        goto done;
2100                                break;
2101                        }
2102                }
2103                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2104        }
2105
2106        /* now sending pages if there are any */
2107        for (i = 0; i < rqst->rq_npages; i++) {
2108                unsigned int offset;
2109
2110                rqst_page_get_length(rqst, i, &buflen, &offset);
2111                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2112                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2113                        buflen, nvecs);
2114                for (j = 0; j < nvecs; j++) {
2115                        size = max_iov_size;
2116                        if (j == nvecs-1)
2117                                size = buflen - j*max_iov_size;
2118                        remaining_data_length -= size;
2119                        log_write(INFO, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
2120                                  i, j * max_iov_size + offset, size,
2121                                  remaining_data_length);
2122                        rc = smbd_post_send_page(
2123                                info, rqst->rq_pages[i],
2124                                j*max_iov_size + offset,
2125                                size, remaining_data_length);
2126                        if (rc)
2127                                goto done;
2128                }
2129        }
2130
2131        rqst_idx++;
2132        if (rqst_idx < num_rqst)
2133                goto next_rqst;
2134
2135done:
2136        /*
2137         * As an optimization, we don't wait for individual I/O to finish
2138         * before sending the next one.
2139         * Send them all and wait for pending send count to get to 0
2140         * that means all the I/Os have been out and we are good to return
2141         */
2142
2143        wait_event(info->wait_send_pending,
2144                atomic_read(&info->send_pending) == 0);
2145
2146        return rc;
2147}
2148
2149static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2150{
2151        struct smbd_mr *mr;
2152        struct ib_cqe *cqe;
2153
2154        if (wc->status) {
2155                log_rdma_mr(ERR, "status=%d\n", wc->status);
2156                cqe = wc->wr_cqe;
2157                mr = container_of(cqe, struct smbd_mr, cqe);
2158                smbd_disconnect_rdma_connection(mr->conn);
2159        }
2160}
2161
2162/*
2163 * The work queue function that recovers MRs
2164 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2165 * again. Both calls are slow, so finish them in a workqueue. This will not
2166 * block I/O path.
2167 * There is one workqueue that recovers MRs, there is no need to lock as the
2168 * I/O requests calling smbd_register_mr will never update the links in the
2169 * mr_list.
2170 */
2171static void smbd_mr_recovery_work(struct work_struct *work)
2172{
2173        struct smbd_connection *info =
2174                container_of(work, struct smbd_connection, mr_recovery_work);
2175        struct smbd_mr *smbdirect_mr;
2176        int rc;
2177
2178        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2179                if (smbdirect_mr->state == MR_ERROR) {
2180
2181                        /* recover this MR entry */
2182                        rc = ib_dereg_mr(smbdirect_mr->mr);
2183                        if (rc) {
2184                                log_rdma_mr(ERR,
2185                                        "ib_dereg_mr failed rc=%x\n",
2186                                        rc);
2187                                smbd_disconnect_rdma_connection(info);
2188                                continue;
2189                        }
2190
2191                        smbdirect_mr->mr = ib_alloc_mr(
2192                                info->pd, info->mr_type,
2193                                info->max_frmr_depth);
2194                        if (IS_ERR(smbdirect_mr->mr)) {
2195                                log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2196                                            info->mr_type,
2197                                            info->max_frmr_depth);
2198                                smbd_disconnect_rdma_connection(info);
2199                                continue;
2200                        }
2201                } else
2202                        /* This MR is being used, don't recover it */
2203                        continue;
2204
2205                smbdirect_mr->state = MR_READY;
2206
2207                /* smbdirect_mr->state is updated by this function
2208                 * and is read and updated by I/O issuing CPUs trying
2209                 * to get a MR, the call to atomic_inc_return
2210                 * implicates a memory barrier and guarantees this
2211                 * value is updated before waking up any calls to
2212                 * get_mr() from the I/O issuing CPUs
2213                 */
2214                if (atomic_inc_return(&info->mr_ready_count) == 1)
2215                        wake_up_interruptible(&info->wait_mr);
2216        }
2217}
2218
2219static void destroy_mr_list(struct smbd_connection *info)
2220{
2221        struct smbd_mr *mr, *tmp;
2222
2223        cancel_work_sync(&info->mr_recovery_work);
2224        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2225                if (mr->state == MR_INVALIDATED)
2226                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2227                                mr->sgl_count, mr->dir);
2228                ib_dereg_mr(mr->mr);
2229                kfree(mr->sgl);
2230                kfree(mr);
2231        }
2232}
2233
2234/*
2235 * Allocate MRs used for RDMA read/write
2236 * The number of MRs will not exceed hardware capability in responder_resources
2237 * All MRs are kept in mr_list. The MR can be recovered after it's used
2238 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2239 * as MRs are used and recovered for I/O, but the list links will not change
2240 */
2241static int allocate_mr_list(struct smbd_connection *info)
2242{
2243        int i;
2244        struct smbd_mr *smbdirect_mr, *tmp;
2245
2246        INIT_LIST_HEAD(&info->mr_list);
2247        init_waitqueue_head(&info->wait_mr);
2248        spin_lock_init(&info->mr_list_lock);
2249        atomic_set(&info->mr_ready_count, 0);
2250        atomic_set(&info->mr_used_count, 0);
2251        init_waitqueue_head(&info->wait_for_mr_cleanup);
2252        /* Allocate more MRs (2x) than hardware responder_resources */
2253        for (i = 0; i < info->responder_resources * 2; i++) {
2254                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2255                if (!smbdirect_mr)
2256                        goto out;
2257                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2258                                        info->max_frmr_depth);
2259                if (IS_ERR(smbdirect_mr->mr)) {
2260                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2261                                    info->mr_type, info->max_frmr_depth);
2262                        goto out;
2263                }
2264                smbdirect_mr->sgl = kcalloc(
2265                                        info->max_frmr_depth,
2266                                        sizeof(struct scatterlist),
2267                                        GFP_KERNEL);
2268                if (!smbdirect_mr->sgl) {
2269                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2270                        ib_dereg_mr(smbdirect_mr->mr);
2271                        goto out;
2272                }
2273                smbdirect_mr->state = MR_READY;
2274                smbdirect_mr->conn = info;
2275
2276                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2277                atomic_inc(&info->mr_ready_count);
2278        }
2279        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2280        return 0;
2281
2282out:
2283        kfree(smbdirect_mr);
2284
2285        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2286                ib_dereg_mr(smbdirect_mr->mr);
2287                kfree(smbdirect_mr->sgl);
2288                kfree(smbdirect_mr);
2289        }
2290        return -ENOMEM;
2291}
2292
2293/*
2294 * Get a MR from mr_list. This function waits until there is at least one
2295 * MR available in the list. It may access the list while the
2296 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2297 * as they never modify the same places. However, there may be several CPUs
2298 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2299 * protect this situation.
2300 */
2301static struct smbd_mr *get_mr(struct smbd_connection *info)
2302{
2303        struct smbd_mr *ret;
2304        int rc;
2305again:
2306        rc = wait_event_interruptible(info->wait_mr,
2307                atomic_read(&info->mr_ready_count) ||
2308                info->transport_status != SMBD_CONNECTED);
2309        if (rc) {
2310                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2311                return NULL;
2312        }
2313
2314        if (info->transport_status != SMBD_CONNECTED) {
2315                log_rdma_mr(ERR, "info->transport_status=%x\n",
2316                        info->transport_status);
2317                return NULL;
2318        }
2319
2320        spin_lock(&info->mr_list_lock);
2321        list_for_each_entry(ret, &info->mr_list, list) {
2322                if (ret->state == MR_READY) {
2323                        ret->state = MR_REGISTERED;
2324                        spin_unlock(&info->mr_list_lock);
2325                        atomic_dec(&info->mr_ready_count);
2326                        atomic_inc(&info->mr_used_count);
2327                        return ret;
2328                }
2329        }
2330
2331        spin_unlock(&info->mr_list_lock);
2332        /*
2333         * It is possible that we could fail to get MR because other processes may
2334         * try to acquire a MR at the same time. If this is the case, retry it.
2335         */
2336        goto again;
2337}
2338
2339/*
2340 * Register memory for RDMA read/write
2341 * pages[]: the list of pages to register memory with
2342 * num_pages: the number of pages to register
2343 * tailsz: if non-zero, the bytes to register in the last page
2344 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2345 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2346 * return value: the MR registered, NULL if failed.
2347 */
2348struct smbd_mr *smbd_register_mr(
2349        struct smbd_connection *info, struct page *pages[], int num_pages,
2350        int offset, int tailsz, bool writing, bool need_invalidate)
2351{
2352        struct smbd_mr *smbdirect_mr;
2353        int rc, i;
2354        enum dma_data_direction dir;
2355        struct ib_reg_wr *reg_wr;
2356
2357        if (num_pages > info->max_frmr_depth) {
2358                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2359                        num_pages, info->max_frmr_depth);
2360                return NULL;
2361        }
2362
2363        smbdirect_mr = get_mr(info);
2364        if (!smbdirect_mr) {
2365                log_rdma_mr(ERR, "get_mr returning NULL\n");
2366                return NULL;
2367        }
2368        smbdirect_mr->need_invalidate = need_invalidate;
2369        smbdirect_mr->sgl_count = num_pages;
2370        sg_init_table(smbdirect_mr->sgl, num_pages);
2371
2372        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2373                        num_pages, offset, tailsz);
2374
2375        if (num_pages == 1) {
2376                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2377                goto skip_multiple_pages;
2378        }
2379
2380        /* We have at least two pages to register */
2381        sg_set_page(
2382                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2383        i = 1;
2384        while (i < num_pages - 1) {
2385                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2386                i++;
2387        }
2388        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2389                tailsz ? tailsz : PAGE_SIZE, 0);
2390
2391skip_multiple_pages:
2392        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2393        smbdirect_mr->dir = dir;
2394        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2395        if (!rc) {
2396                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2397                        num_pages, dir, rc);
2398                goto dma_map_error;
2399        }
2400
2401        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2402                NULL, PAGE_SIZE);
2403        if (rc != num_pages) {
2404                log_rdma_mr(ERR,
2405                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2406                        rc, num_pages);
2407                goto map_mr_error;
2408        }
2409
2410        ib_update_fast_reg_key(smbdirect_mr->mr,
2411                ib_inc_rkey(smbdirect_mr->mr->rkey));
2412        reg_wr = &smbdirect_mr->wr;
2413        reg_wr->wr.opcode = IB_WR_REG_MR;
2414        smbdirect_mr->cqe.done = register_mr_done;
2415        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2416        reg_wr->wr.num_sge = 0;
2417        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2418        reg_wr->mr = smbdirect_mr->mr;
2419        reg_wr->key = smbdirect_mr->mr->rkey;
2420        reg_wr->access = writing ?
2421                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2422                        IB_ACCESS_REMOTE_READ;
2423
2424        /*
2425         * There is no need for waiting for complemtion on ib_post_send
2426         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2427         * on the next ib_post_send when we actaully send I/O to remote peer
2428         */
2429        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2430        if (!rc)
2431                return smbdirect_mr;
2432
2433        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2434                rc, reg_wr->key);
2435
2436        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2437map_mr_error:
2438        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2439                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2440
2441dma_map_error:
2442        smbdirect_mr->state = MR_ERROR;
2443        if (atomic_dec_and_test(&info->mr_used_count))
2444                wake_up(&info->wait_for_mr_cleanup);
2445
2446        smbd_disconnect_rdma_connection(info);
2447
2448        return NULL;
2449}
2450
2451static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2452{
2453        struct smbd_mr *smbdirect_mr;
2454        struct ib_cqe *cqe;
2455
2456        cqe = wc->wr_cqe;
2457        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2458        smbdirect_mr->state = MR_INVALIDATED;
2459        if (wc->status != IB_WC_SUCCESS) {
2460                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2461                smbdirect_mr->state = MR_ERROR;
2462        }
2463        complete(&smbdirect_mr->invalidate_done);
2464}
2465
2466/*
2467 * Deregister a MR after I/O is done
2468 * This function may wait if remote invalidation is not used
2469 * and we have to locally invalidate the buffer to prevent data is being
2470 * modified by remote peer after upper layer consumes it
2471 */
2472int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2473{
2474        struct ib_send_wr *wr;
2475        struct smbd_connection *info = smbdirect_mr->conn;
2476        int rc = 0;
2477
2478        if (smbdirect_mr->need_invalidate) {
2479                /* Need to finish local invalidation before returning */
2480                wr = &smbdirect_mr->inv_wr;
2481                wr->opcode = IB_WR_LOCAL_INV;
2482                smbdirect_mr->cqe.done = local_inv_done;
2483                wr->wr_cqe = &smbdirect_mr->cqe;
2484                wr->num_sge = 0;
2485                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2486                wr->send_flags = IB_SEND_SIGNALED;
2487
2488                init_completion(&smbdirect_mr->invalidate_done);
2489                rc = ib_post_send(info->id->qp, wr, NULL);
2490                if (rc) {
2491                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2492                        smbd_disconnect_rdma_connection(info);
2493                        goto done;
2494                }
2495                wait_for_completion(&smbdirect_mr->invalidate_done);
2496                smbdirect_mr->need_invalidate = false;
2497        } else
2498                /*
2499                 * For remote invalidation, just set it to MR_INVALIDATED
2500                 * and defer to mr_recovery_work to recover the MR for next use
2501                 */
2502                smbdirect_mr->state = MR_INVALIDATED;
2503
2504        if (smbdirect_mr->state == MR_INVALIDATED) {
2505                ib_dma_unmap_sg(
2506                        info->id->device, smbdirect_mr->sgl,
2507                        smbdirect_mr->sgl_count,
2508                        smbdirect_mr->dir);
2509                smbdirect_mr->state = MR_READY;
2510                if (atomic_inc_return(&info->mr_ready_count) == 1)
2511                        wake_up_interruptible(&info->wait_mr);
2512        } else
2513                /*
2514                 * Schedule the work to do MR recovery for future I/Os MR
2515                 * recovery is slow and don't want it to block current I/O
2516                 */
2517                queue_work(info->workqueue, &info->mr_recovery_work);
2518
2519done:
2520        if (atomic_dec_and_test(&info->mr_used_count))
2521                wake_up(&info->wait_for_mr_cleanup);
2522
2523        return rc;
2524}
2525