linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15                struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17                struct smbd_connection *info);
  18static void put_receive_buffer(
  19                struct smbd_connection *info,
  20                struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25                struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27                struct smbd_connection *info,
  28                struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30                struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33                struct smbd_connection *info,
  34                struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38                struct smbd_connection *info,
  39                struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41                struct page *page, unsigned long offset,
  42                size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1 0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT        445
  52#define SMBD_PORT       5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT    5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT  120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE           128
  62#define SMBD_MIN_FRAGMENTED_SIZE        131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES     32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY                   6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY               0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING                    0x1
 115#define LOG_INCOMING                    0x2
 116#define LOG_READ                        0x4
 117#define LOG_WRITE                       0x8
 118#define LOG_RDMA_SEND                   0x10
 119#define LOG_RDMA_RECV                   0x20
 120#define LOG_KEEP_ALIVE                  0x40
 121#define LOG_RDMA_EVENT                  0x80
 122#define LOG_RDMA_MR                     0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126        "Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR             0x0
 129#define INFO            0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133        "Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)                            \
 136do {                                                                    \
 137        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 138                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144                log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160        struct smbd_connection *info =
 161                container_of(work, struct smbd_connection, disconnect_work);
 162
 163        if (info->transport_status == SMBD_CONNECTED) {
 164                info->transport_status = SMBD_DISCONNECTING;
 165                rdma_disconnect(info->id);
 166        }
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171        queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176                struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178        struct smbd_connection *info = id->context;
 179
 180        log_rdma_event(INFO, "event=%d status=%d\n",
 181                event->event, event->status);
 182
 183        switch (event->event) {
 184        case RDMA_CM_EVENT_ADDR_RESOLVED:
 185        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186                info->ri_rc = 0;
 187                complete(&info->ri_done);
 188                break;
 189
 190        case RDMA_CM_EVENT_ADDR_ERROR:
 191                info->ri_rc = -EHOSTUNREACH;
 192                complete(&info->ri_done);
 193                break;
 194
 195        case RDMA_CM_EVENT_ROUTE_ERROR:
 196                info->ri_rc = -ENETUNREACH;
 197                complete(&info->ri_done);
 198                break;
 199
 200        case RDMA_CM_EVENT_ESTABLISHED:
 201                log_rdma_event(INFO, "connected event=%d\n", event->event);
 202                info->transport_status = SMBD_CONNECTED;
 203                wake_up_interruptible(&info->conn_wait);
 204                break;
 205
 206        case RDMA_CM_EVENT_CONNECT_ERROR:
 207        case RDMA_CM_EVENT_UNREACHABLE:
 208        case RDMA_CM_EVENT_REJECTED:
 209                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210                info->transport_status = SMBD_DISCONNECTED;
 211                wake_up_interruptible(&info->conn_wait);
 212                break;
 213
 214        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215        case RDMA_CM_EVENT_DISCONNECTED:
 216                /* This happenes when we fail the negotiation */
 217                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218                        info->transport_status = SMBD_DISCONNECTED;
 219                        wake_up(&info->conn_wait);
 220                        break;
 221                }
 222
 223                info->transport_status = SMBD_DISCONNECTED;
 224                wake_up_interruptible(&info->disconn_wait);
 225                wake_up_interruptible(&info->wait_reassembly_queue);
 226                wake_up_interruptible_all(&info->wait_send_queue);
 227                break;
 228
 229        default:
 230                break;
 231        }
 232
 233        return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240        struct smbd_connection *info = context;
 241
 242        log_rdma_event(ERR, "%s on device %s info %p\n",
 243                ib_event_msg(event->event), event->device->name, info);
 244
 245        switch (event->event) {
 246        case IB_EVENT_CQ_ERR:
 247        case IB_EVENT_QP_FATAL:
 248                smbd_disconnect_rdma_connection(info);
 249
 250        default:
 251                break;
 252        }
 253}
 254
 255static inline void *smbd_request_payload(struct smbd_request *request)
 256{
 257        return (void *)request->packet;
 258}
 259
 260static inline void *smbd_response_payload(struct smbd_response *response)
 261{
 262        return (void *)response->packet;
 263}
 264
 265/* Called when a RDMA send is done */
 266static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 267{
 268        int i;
 269        struct smbd_request *request =
 270                container_of(wc->wr_cqe, struct smbd_request, cqe);
 271
 272        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 273                request, wc->status);
 274
 275        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 276                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 277                        wc->status, wc->opcode);
 278                smbd_disconnect_rdma_connection(request->info);
 279        }
 280
 281        for (i = 0; i < request->num_sge; i++)
 282                ib_dma_unmap_single(request->info->id->device,
 283                        request->sge[i].addr,
 284                        request->sge[i].length,
 285                        DMA_TO_DEVICE);
 286
 287        if (atomic_dec_and_test(&request->info->send_pending))
 288                wake_up(&request->info->wait_send_pending);
 289
 290        wake_up(&request->info->wait_post_send);
 291
 292        mempool_free(request, request->info->request_mempool);
 293}
 294
 295static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 296{
 297        log_rdma_event(INFO, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
 298                       resp->min_version, resp->max_version,
 299                       resp->negotiated_version, resp->credits_requested,
 300                       resp->credits_granted, resp->status,
 301                       resp->max_readwrite_size, resp->preferred_send_size,
 302                       resp->max_receive_size, resp->max_fragmented_size);
 303}
 304
 305/*
 306 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 307 * response, packet_length: the negotiation response message
 308 * return value: true if negotiation is a success, false if failed
 309 */
 310static bool process_negotiation_response(
 311                struct smbd_response *response, int packet_length)
 312{
 313        struct smbd_connection *info = response->info;
 314        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 315
 316        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 317                log_rdma_event(ERR,
 318                        "error: packet_length=%d\n", packet_length);
 319                return false;
 320        }
 321
 322        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 323                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 324                        le16_to_cpu(packet->negotiated_version));
 325                return false;
 326        }
 327        info->protocol = le16_to_cpu(packet->negotiated_version);
 328
 329        if (packet->credits_requested == 0) {
 330                log_rdma_event(ERR, "error: credits_requested==0\n");
 331                return false;
 332        }
 333        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 334
 335        if (packet->credits_granted == 0) {
 336                log_rdma_event(ERR, "error: credits_granted==0\n");
 337                return false;
 338        }
 339        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 340
 341        atomic_set(&info->receive_credits, 0);
 342
 343        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 344                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 345                        le32_to_cpu(packet->preferred_send_size));
 346                return false;
 347        }
 348        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 349
 350        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 351                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 352                        le32_to_cpu(packet->max_receive_size));
 353                return false;
 354        }
 355        info->max_send_size = min_t(int, info->max_send_size,
 356                                        le32_to_cpu(packet->max_receive_size));
 357
 358        if (le32_to_cpu(packet->max_fragmented_size) <
 359                        SMBD_MIN_FRAGMENTED_SIZE) {
 360                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 361                        le32_to_cpu(packet->max_fragmented_size));
 362                return false;
 363        }
 364        info->max_fragmented_send_size =
 365                le32_to_cpu(packet->max_fragmented_size);
 366        info->rdma_readwrite_threshold =
 367                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 368                info->max_fragmented_send_size :
 369                rdma_readwrite_threshold;
 370
 371
 372        info->max_readwrite_size = min_t(u32,
 373                        le32_to_cpu(packet->max_readwrite_size),
 374                        info->max_frmr_depth * PAGE_SIZE);
 375        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 376
 377        return true;
 378}
 379
 380static void smbd_post_send_credits(struct work_struct *work)
 381{
 382        int ret = 0;
 383        int use_receive_queue = 1;
 384        int rc;
 385        struct smbd_response *response;
 386        struct smbd_connection *info =
 387                container_of(work, struct smbd_connection,
 388                        post_send_credits_work);
 389
 390        if (info->transport_status != SMBD_CONNECTED) {
 391                wake_up(&info->wait_receive_queues);
 392                return;
 393        }
 394
 395        if (info->receive_credit_target >
 396                atomic_read(&info->receive_credits)) {
 397                while (true) {
 398                        if (use_receive_queue)
 399                                response = get_receive_buffer(info);
 400                        else
 401                                response = get_empty_queue_buffer(info);
 402                        if (!response) {
 403                                /* now switch to emtpy packet queue */
 404                                if (use_receive_queue) {
 405                                        use_receive_queue = 0;
 406                                        continue;
 407                                } else
 408                                        break;
 409                        }
 410
 411                        response->type = SMBD_TRANSFER_DATA;
 412                        response->first_segment = false;
 413                        rc = smbd_post_recv(info, response);
 414                        if (rc) {
 415                                log_rdma_recv(ERR,
 416                                        "post_recv failed rc=%d\n", rc);
 417                                put_receive_buffer(info, response);
 418                                break;
 419                        }
 420
 421                        ret++;
 422                }
 423        }
 424
 425        spin_lock(&info->lock_new_credits_offered);
 426        info->new_credits_offered += ret;
 427        spin_unlock(&info->lock_new_credits_offered);
 428
 429        /* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
 430        info->send_immediate = true;
 431        if (atomic_read(&info->receive_credits) <
 432                info->receive_credit_target - 1) {
 433                if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
 434                    info->send_immediate) {
 435                        log_keep_alive(INFO, "send an empty message\n");
 436                        smbd_post_send_empty(info);
 437                }
 438        }
 439}
 440
 441/* Called from softirq, when recv is done */
 442static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 443{
 444        struct smbd_data_transfer *data_transfer;
 445        struct smbd_response *response =
 446                container_of(wc->wr_cqe, struct smbd_response, cqe);
 447        struct smbd_connection *info = response->info;
 448        int data_length = 0;
 449
 450        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
 451                      response, response->type, wc->status, wc->opcode,
 452                      wc->byte_len, wc->pkey_index);
 453
 454        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 455                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 456                        wc->status, wc->opcode);
 457                smbd_disconnect_rdma_connection(info);
 458                goto error;
 459        }
 460
 461        ib_dma_sync_single_for_cpu(
 462                wc->qp->device,
 463                response->sge.addr,
 464                response->sge.length,
 465                DMA_FROM_DEVICE);
 466
 467        switch (response->type) {
 468        /* SMBD negotiation response */
 469        case SMBD_NEGOTIATE_RESP:
 470                dump_smbd_negotiate_resp(smbd_response_payload(response));
 471                info->full_packet_received = true;
 472                info->negotiate_done =
 473                        process_negotiation_response(response, wc->byte_len);
 474                complete(&info->negotiate_completion);
 475                break;
 476
 477        /* SMBD data transfer packet */
 478        case SMBD_TRANSFER_DATA:
 479                data_transfer = smbd_response_payload(response);
 480                data_length = le32_to_cpu(data_transfer->data_length);
 481
 482                /*
 483                 * If this is a packet with data playload place the data in
 484                 * reassembly queue and wake up the reading thread
 485                 */
 486                if (data_length) {
 487                        if (info->full_packet_received)
 488                                response->first_segment = true;
 489
 490                        if (le32_to_cpu(data_transfer->remaining_data_length))
 491                                info->full_packet_received = false;
 492                        else
 493                                info->full_packet_received = true;
 494
 495                        enqueue_reassembly(
 496                                info,
 497                                response,
 498                                data_length);
 499                } else
 500                        put_empty_packet(info, response);
 501
 502                if (data_length)
 503                        wake_up_interruptible(&info->wait_reassembly_queue);
 504
 505                atomic_dec(&info->receive_credits);
 506                info->receive_credit_target =
 507                        le16_to_cpu(data_transfer->credits_requested);
 508                if (le16_to_cpu(data_transfer->credits_granted)) {
 509                        atomic_add(le16_to_cpu(data_transfer->credits_granted),
 510                                &info->send_credits);
 511                        /*
 512                         * We have new send credits granted from remote peer
 513                         * If any sender is waiting for credits, unblock it
 514                         */
 515                        wake_up_interruptible(&info->wait_send_queue);
 516                }
 517
 518                log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
 519                             le16_to_cpu(data_transfer->flags),
 520                             le32_to_cpu(data_transfer->data_offset),
 521                             le32_to_cpu(data_transfer->data_length),
 522                             le32_to_cpu(data_transfer->remaining_data_length));
 523
 524                /* Send a KEEP_ALIVE response right away if requested */
 525                info->keep_alive_requested = KEEP_ALIVE_NONE;
 526                if (le16_to_cpu(data_transfer->flags) &
 527                                SMB_DIRECT_RESPONSE_REQUESTED) {
 528                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 529                }
 530
 531                return;
 532
 533        default:
 534                log_rdma_recv(ERR,
 535                        "unexpected response type=%d\n", response->type);
 536        }
 537
 538error:
 539        put_receive_buffer(info, response);
 540}
 541
 542static struct rdma_cm_id *smbd_create_id(
 543                struct smbd_connection *info,
 544                struct sockaddr *dstaddr, int port)
 545{
 546        struct rdma_cm_id *id;
 547        int rc;
 548        __be16 *sport;
 549
 550        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 551                RDMA_PS_TCP, IB_QPT_RC);
 552        if (IS_ERR(id)) {
 553                rc = PTR_ERR(id);
 554                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 555                return id;
 556        }
 557
 558        if (dstaddr->sa_family == AF_INET6)
 559                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 560        else
 561                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 562
 563        *sport = htons(port);
 564
 565        init_completion(&info->ri_done);
 566        info->ri_rc = -ETIMEDOUT;
 567
 568        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 569                RDMA_RESOLVE_TIMEOUT);
 570        if (rc) {
 571                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 572                goto out;
 573        }
 574        wait_for_completion_interruptible_timeout(
 575                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 576        rc = info->ri_rc;
 577        if (rc) {
 578                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 579                goto out;
 580        }
 581
 582        info->ri_rc = -ETIMEDOUT;
 583        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 584        if (rc) {
 585                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 586                goto out;
 587        }
 588        wait_for_completion_interruptible_timeout(
 589                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 590        rc = info->ri_rc;
 591        if (rc) {
 592                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 593                goto out;
 594        }
 595
 596        return id;
 597
 598out:
 599        rdma_destroy_id(id);
 600        return ERR_PTR(rc);
 601}
 602
 603/*
 604 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 605 * This implementation requries FRWR on RDMA read/write
 606 * return value: true if it is supported
 607 */
 608static bool frwr_is_supported(struct ib_device_attr *attrs)
 609{
 610        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 611                return false;
 612        if (attrs->max_fast_reg_page_list_len == 0)
 613                return false;
 614        return true;
 615}
 616
 617static int smbd_ia_open(
 618                struct smbd_connection *info,
 619                struct sockaddr *dstaddr, int port)
 620{
 621        int rc;
 622
 623        info->id = smbd_create_id(info, dstaddr, port);
 624        if (IS_ERR(info->id)) {
 625                rc = PTR_ERR(info->id);
 626                goto out1;
 627        }
 628
 629        if (!frwr_is_supported(&info->id->device->attrs)) {
 630                log_rdma_event(ERR, "Fast Registration Work Requests (FRWR) is not supported\n");
 631                log_rdma_event(ERR, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
 632                               info->id->device->attrs.device_cap_flags,
 633                               info->id->device->attrs.max_fast_reg_page_list_len);
 634                rc = -EPROTONOSUPPORT;
 635                goto out2;
 636        }
 637        info->max_frmr_depth = min_t(int,
 638                smbd_max_frmr_depth,
 639                info->id->device->attrs.max_fast_reg_page_list_len);
 640        info->mr_type = IB_MR_TYPE_MEM_REG;
 641        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 642                info->mr_type = IB_MR_TYPE_SG_GAPS;
 643
 644        info->pd = ib_alloc_pd(info->id->device, 0);
 645        if (IS_ERR(info->pd)) {
 646                rc = PTR_ERR(info->pd);
 647                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 648                goto out2;
 649        }
 650
 651        return 0;
 652
 653out2:
 654        rdma_destroy_id(info->id);
 655        info->id = NULL;
 656
 657out1:
 658        return rc;
 659}
 660
 661/*
 662 * Send a negotiation request message to the peer
 663 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 664 * After negotiation, the transport is connected and ready for
 665 * carrying upper layer SMB payload
 666 */
 667static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 668{
 669        struct ib_send_wr send_wr;
 670        int rc = -ENOMEM;
 671        struct smbd_request *request;
 672        struct smbd_negotiate_req *packet;
 673
 674        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 675        if (!request)
 676                return rc;
 677
 678        request->info = info;
 679
 680        packet = smbd_request_payload(request);
 681        packet->min_version = cpu_to_le16(SMBD_V1);
 682        packet->max_version = cpu_to_le16(SMBD_V1);
 683        packet->reserved = 0;
 684        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 685        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 686        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 687        packet->max_fragmented_size =
 688                cpu_to_le32(info->max_fragmented_recv_size);
 689
 690        request->num_sge = 1;
 691        request->sge[0].addr = ib_dma_map_single(
 692                                info->id->device, (void *)packet,
 693                                sizeof(*packet), DMA_TO_DEVICE);
 694        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 695                rc = -EIO;
 696                goto dma_mapping_failed;
 697        }
 698
 699        request->sge[0].length = sizeof(*packet);
 700        request->sge[0].lkey = info->pd->local_dma_lkey;
 701
 702        ib_dma_sync_single_for_device(
 703                info->id->device, request->sge[0].addr,
 704                request->sge[0].length, DMA_TO_DEVICE);
 705
 706        request->cqe.done = send_done;
 707
 708        send_wr.next = NULL;
 709        send_wr.wr_cqe = &request->cqe;
 710        send_wr.sg_list = request->sge;
 711        send_wr.num_sge = request->num_sge;
 712        send_wr.opcode = IB_WR_SEND;
 713        send_wr.send_flags = IB_SEND_SIGNALED;
 714
 715        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 716                request->sge[0].addr,
 717                request->sge[0].length, request->sge[0].lkey);
 718
 719        atomic_inc(&info->send_pending);
 720        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 721        if (!rc)
 722                return 0;
 723
 724        /* if we reach here, post send failed */
 725        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 726        atomic_dec(&info->send_pending);
 727        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 728                request->sge[0].length, DMA_TO_DEVICE);
 729
 730        smbd_disconnect_rdma_connection(info);
 731
 732dma_mapping_failed:
 733        mempool_free(request, info->request_mempool);
 734        return rc;
 735}
 736
 737/*
 738 * Extend the credits to remote peer
 739 * This implements [MS-SMBD] 3.1.5.9
 740 * The idea is that we should extend credits to remote peer as quickly as
 741 * it's allowed, to maintain data flow. We allocate as much receive
 742 * buffer as possible, and extend the receive credits to remote peer
 743 * return value: the new credtis being granted.
 744 */
 745static int manage_credits_prior_sending(struct smbd_connection *info)
 746{
 747        int new_credits;
 748
 749        spin_lock(&info->lock_new_credits_offered);
 750        new_credits = info->new_credits_offered;
 751        info->new_credits_offered = 0;
 752        spin_unlock(&info->lock_new_credits_offered);
 753
 754        return new_credits;
 755}
 756
 757/*
 758 * Check if we need to send a KEEP_ALIVE message
 759 * The idle connection timer triggers a KEEP_ALIVE message when expires
 760 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 761 * back a response.
 762 * return value:
 763 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 764 * 0: otherwise
 765 */
 766static int manage_keep_alive_before_sending(struct smbd_connection *info)
 767{
 768        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 769                info->keep_alive_requested = KEEP_ALIVE_SENT;
 770                return 1;
 771        }
 772        return 0;
 773}
 774
 775/* Post the send request */
 776static int smbd_post_send(struct smbd_connection *info,
 777                struct smbd_request *request)
 778{
 779        struct ib_send_wr send_wr;
 780        int rc, i;
 781
 782        for (i = 0; i < request->num_sge; i++) {
 783                log_rdma_send(INFO,
 784                        "rdma_request sge[%d] addr=%llu length=%u\n",
 785                        i, request->sge[i].addr, request->sge[i].length);
 786                ib_dma_sync_single_for_device(
 787                        info->id->device,
 788                        request->sge[i].addr,
 789                        request->sge[i].length,
 790                        DMA_TO_DEVICE);
 791        }
 792
 793        request->cqe.done = send_done;
 794
 795        send_wr.next = NULL;
 796        send_wr.wr_cqe = &request->cqe;
 797        send_wr.sg_list = request->sge;
 798        send_wr.num_sge = request->num_sge;
 799        send_wr.opcode = IB_WR_SEND;
 800        send_wr.send_flags = IB_SEND_SIGNALED;
 801
 802        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 803        if (rc) {
 804                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 805                smbd_disconnect_rdma_connection(info);
 806                rc = -EAGAIN;
 807        } else
 808                /* Reset timer for idle connection after packet is sent */
 809                mod_delayed_work(info->workqueue, &info->idle_timer_work,
 810                        info->keep_alive_interval*HZ);
 811
 812        return rc;
 813}
 814
 815static int smbd_post_send_sgl(struct smbd_connection *info,
 816        struct scatterlist *sgl, int data_length, int remaining_data_length)
 817{
 818        int num_sgs;
 819        int i, rc;
 820        int header_length;
 821        struct smbd_request *request;
 822        struct smbd_data_transfer *packet;
 823        int new_credits;
 824        struct scatterlist *sg;
 825
 826wait_credit:
 827        /* Wait for send credits. A SMBD packet needs one credit */
 828        rc = wait_event_interruptible(info->wait_send_queue,
 829                atomic_read(&info->send_credits) > 0 ||
 830                info->transport_status != SMBD_CONNECTED);
 831        if (rc)
 832                goto err_wait_credit;
 833
 834        if (info->transport_status != SMBD_CONNECTED) {
 835                log_outgoing(ERR, "disconnected not sending on wait_credit\n");
 836                rc = -EAGAIN;
 837                goto err_wait_credit;
 838        }
 839        if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
 840                atomic_inc(&info->send_credits);
 841                goto wait_credit;
 842        }
 843
 844wait_send_queue:
 845        wait_event(info->wait_post_send,
 846                atomic_read(&info->send_pending) < info->send_credit_target ||
 847                info->transport_status != SMBD_CONNECTED);
 848
 849        if (info->transport_status != SMBD_CONNECTED) {
 850                log_outgoing(ERR, "disconnected not sending on wait_send_queue\n");
 851                rc = -EAGAIN;
 852                goto err_wait_send_queue;
 853        }
 854
 855        if (unlikely(atomic_inc_return(&info->send_pending) >
 856                                info->send_credit_target)) {
 857                atomic_dec(&info->send_pending);
 858                goto wait_send_queue;
 859        }
 860
 861        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 862        if (!request) {
 863                rc = -ENOMEM;
 864                goto err_alloc;
 865        }
 866
 867        request->info = info;
 868
 869        /* Fill in the packet header */
 870        packet = smbd_request_payload(request);
 871        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 872
 873        new_credits = manage_credits_prior_sending(info);
 874        atomic_add(new_credits, &info->receive_credits);
 875        packet->credits_granted = cpu_to_le16(new_credits);
 876
 877        info->send_immediate = false;
 878
 879        packet->flags = 0;
 880        if (manage_keep_alive_before_sending(info))
 881                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 882
 883        packet->reserved = 0;
 884        if (!data_length)
 885                packet->data_offset = 0;
 886        else
 887                packet->data_offset = cpu_to_le32(24);
 888        packet->data_length = cpu_to_le32(data_length);
 889        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 890        packet->padding = 0;
 891
 892        log_outgoing(INFO, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
 893                     le16_to_cpu(packet->credits_requested),
 894                     le16_to_cpu(packet->credits_granted),
 895                     le32_to_cpu(packet->data_offset),
 896                     le32_to_cpu(packet->data_length),
 897                     le32_to_cpu(packet->remaining_data_length));
 898
 899        /* Map the packet to DMA */
 900        header_length = sizeof(struct smbd_data_transfer);
 901        /* If this is a packet without payload, don't send padding */
 902        if (!data_length)
 903                header_length = offsetof(struct smbd_data_transfer, padding);
 904
 905        request->num_sge = 1;
 906        request->sge[0].addr = ib_dma_map_single(info->id->device,
 907                                                 (void *)packet,
 908                                                 header_length,
 909                                                 DMA_TO_DEVICE);
 910        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 911                rc = -EIO;
 912                request->sge[0].addr = 0;
 913                goto err_dma;
 914        }
 915
 916        request->sge[0].length = header_length;
 917        request->sge[0].lkey = info->pd->local_dma_lkey;
 918
 919        /* Fill in the packet data payload */
 920        num_sgs = sgl ? sg_nents(sgl) : 0;
 921        for_each_sg(sgl, sg, num_sgs, i) {
 922                request->sge[i+1].addr =
 923                        ib_dma_map_page(info->id->device, sg_page(sg),
 924                               sg->offset, sg->length, DMA_TO_DEVICE);
 925                if (ib_dma_mapping_error(
 926                                info->id->device, request->sge[i+1].addr)) {
 927                        rc = -EIO;
 928                        request->sge[i+1].addr = 0;
 929                        goto err_dma;
 930                }
 931                request->sge[i+1].length = sg->length;
 932                request->sge[i+1].lkey = info->pd->local_dma_lkey;
 933                request->num_sge++;
 934        }
 935
 936        rc = smbd_post_send(info, request);
 937        if (!rc)
 938                return 0;
 939
 940err_dma:
 941        for (i = 0; i < request->num_sge; i++)
 942                if (request->sge[i].addr)
 943                        ib_dma_unmap_single(info->id->device,
 944                                            request->sge[i].addr,
 945                                            request->sge[i].length,
 946                                            DMA_TO_DEVICE);
 947        mempool_free(request, info->request_mempool);
 948
 949        /* roll back receive credits and credits to be offered */
 950        spin_lock(&info->lock_new_credits_offered);
 951        info->new_credits_offered += new_credits;
 952        spin_unlock(&info->lock_new_credits_offered);
 953        atomic_sub(new_credits, &info->receive_credits);
 954
 955err_alloc:
 956        if (atomic_dec_and_test(&info->send_pending))
 957                wake_up(&info->wait_send_pending);
 958
 959err_wait_send_queue:
 960        /* roll back send credits and pending */
 961        atomic_inc(&info->send_credits);
 962
 963err_wait_credit:
 964        return rc;
 965}
 966
 967/*
 968 * Send a page
 969 * page: the page to send
 970 * offset: offset in the page to send
 971 * size: length in the page to send
 972 * remaining_data_length: remaining data to send in this payload
 973 */
 974static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
 975                unsigned long offset, size_t size, int remaining_data_length)
 976{
 977        struct scatterlist sgl;
 978
 979        sg_init_table(&sgl, 1);
 980        sg_set_page(&sgl, page, size, offset);
 981
 982        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
 983}
 984
 985/*
 986 * Send an empty message
 987 * Empty message is used to extend credits to peer to for keep live
 988 * while there is no upper layer payload to send at the time
 989 */
 990static int smbd_post_send_empty(struct smbd_connection *info)
 991{
 992        info->count_send_empty++;
 993        return smbd_post_send_sgl(info, NULL, 0, 0);
 994}
 995
 996/*
 997 * Send a data buffer
 998 * iov: the iov array describing the data buffers
 999 * n_vec: number of iov array
1000 * remaining_data_length: remaining data to send following this packet
1001 * in segmented SMBD packet
1002 */
1003static int smbd_post_send_data(
1004        struct smbd_connection *info, struct kvec *iov, int n_vec,
1005        int remaining_data_length)
1006{
1007        int i;
1008        u32 data_length = 0;
1009        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1010
1011        if (n_vec > SMBDIRECT_MAX_SGE) {
1012                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1013                return -EINVAL;
1014        }
1015
1016        sg_init_table(sgl, n_vec);
1017        for (i = 0; i < n_vec; i++) {
1018                data_length += iov[i].iov_len;
1019                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1020        }
1021
1022        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1023}
1024
1025/*
1026 * Post a receive request to the transport
1027 * The remote peer can only send data when a receive request is posted
1028 * The interaction is controlled by send/receive credit system
1029 */
1030static int smbd_post_recv(
1031                struct smbd_connection *info, struct smbd_response *response)
1032{
1033        struct ib_recv_wr recv_wr;
1034        int rc = -EIO;
1035
1036        response->sge.addr = ib_dma_map_single(
1037                                info->id->device, response->packet,
1038                                info->max_receive_size, DMA_FROM_DEVICE);
1039        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1040                return rc;
1041
1042        response->sge.length = info->max_receive_size;
1043        response->sge.lkey = info->pd->local_dma_lkey;
1044
1045        response->cqe.done = recv_done;
1046
1047        recv_wr.wr_cqe = &response->cqe;
1048        recv_wr.next = NULL;
1049        recv_wr.sg_list = &response->sge;
1050        recv_wr.num_sge = 1;
1051
1052        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1053        if (rc) {
1054                ib_dma_unmap_single(info->id->device, response->sge.addr,
1055                                    response->sge.length, DMA_FROM_DEVICE);
1056                smbd_disconnect_rdma_connection(info);
1057                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1058        }
1059
1060        return rc;
1061}
1062
1063/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1064static int smbd_negotiate(struct smbd_connection *info)
1065{
1066        int rc;
1067        struct smbd_response *response = get_receive_buffer(info);
1068
1069        response->type = SMBD_NEGOTIATE_RESP;
1070        rc = smbd_post_recv(info, response);
1071        log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
1072                       rc, response->sge.addr,
1073                       response->sge.length, response->sge.lkey);
1074        if (rc)
1075                return rc;
1076
1077        init_completion(&info->negotiate_completion);
1078        info->negotiate_done = false;
1079        rc = smbd_post_send_negotiate_req(info);
1080        if (rc)
1081                return rc;
1082
1083        rc = wait_for_completion_interruptible_timeout(
1084                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1085        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1086
1087        if (info->negotiate_done)
1088                return 0;
1089
1090        if (rc == 0)
1091                rc = -ETIMEDOUT;
1092        else if (rc == -ERESTARTSYS)
1093                rc = -EINTR;
1094        else
1095                rc = -ENOTCONN;
1096
1097        return rc;
1098}
1099
1100static void put_empty_packet(
1101                struct smbd_connection *info, struct smbd_response *response)
1102{
1103        spin_lock(&info->empty_packet_queue_lock);
1104        list_add_tail(&response->list, &info->empty_packet_queue);
1105        info->count_empty_packet_queue++;
1106        spin_unlock(&info->empty_packet_queue_lock);
1107
1108        queue_work(info->workqueue, &info->post_send_credits_work);
1109}
1110
1111/*
1112 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1113 * This is a queue for reassembling upper layer payload and present to upper
1114 * layer. All the inncoming payload go to the reassembly queue, regardless of
1115 * if reassembly is required. The uuper layer code reads from the queue for all
1116 * incoming payloads.
1117 * Put a received packet to the reassembly queue
1118 * response: the packet received
1119 * data_length: the size of payload in this packet
1120 */
1121static void enqueue_reassembly(
1122        struct smbd_connection *info,
1123        struct smbd_response *response,
1124        int data_length)
1125{
1126        spin_lock(&info->reassembly_queue_lock);
1127        list_add_tail(&response->list, &info->reassembly_queue);
1128        info->reassembly_queue_length++;
1129        /*
1130         * Make sure reassembly_data_length is updated after list and
1131         * reassembly_queue_length are updated. On the dequeue side
1132         * reassembly_data_length is checked without a lock to determine
1133         * if reassembly_queue_length and list is up to date
1134         */
1135        virt_wmb();
1136        info->reassembly_data_length += data_length;
1137        spin_unlock(&info->reassembly_queue_lock);
1138        info->count_reassembly_queue++;
1139        info->count_enqueue_reassembly_queue++;
1140}
1141
1142/*
1143 * Get the first entry at the front of reassembly queue
1144 * Caller is responsible for locking
1145 * return value: the first entry if any, NULL if queue is empty
1146 */
1147static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1148{
1149        struct smbd_response *ret = NULL;
1150
1151        if (!list_empty(&info->reassembly_queue)) {
1152                ret = list_first_entry(
1153                        &info->reassembly_queue,
1154                        struct smbd_response, list);
1155        }
1156        return ret;
1157}
1158
1159static struct smbd_response *get_empty_queue_buffer(
1160                struct smbd_connection *info)
1161{
1162        struct smbd_response *ret = NULL;
1163        unsigned long flags;
1164
1165        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1166        if (!list_empty(&info->empty_packet_queue)) {
1167                ret = list_first_entry(
1168                        &info->empty_packet_queue,
1169                        struct smbd_response, list);
1170                list_del(&ret->list);
1171                info->count_empty_packet_queue--;
1172        }
1173        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1174
1175        return ret;
1176}
1177
1178/*
1179 * Get a receive buffer
1180 * For each remote send, we need to post a receive. The receive buffers are
1181 * pre-allocated in advance.
1182 * return value: the receive buffer, NULL if none is available
1183 */
1184static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1185{
1186        struct smbd_response *ret = NULL;
1187        unsigned long flags;
1188
1189        spin_lock_irqsave(&info->receive_queue_lock, flags);
1190        if (!list_empty(&info->receive_queue)) {
1191                ret = list_first_entry(
1192                        &info->receive_queue,
1193                        struct smbd_response, list);
1194                list_del(&ret->list);
1195                info->count_receive_queue--;
1196                info->count_get_receive_buffer++;
1197        }
1198        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1199
1200        return ret;
1201}
1202
1203/*
1204 * Return a receive buffer
1205 * Upon returning of a receive buffer, we can post new receive and extend
1206 * more receive credits to remote peer. This is done immediately after a
1207 * receive buffer is returned.
1208 */
1209static void put_receive_buffer(
1210        struct smbd_connection *info, struct smbd_response *response)
1211{
1212        unsigned long flags;
1213
1214        ib_dma_unmap_single(info->id->device, response->sge.addr,
1215                response->sge.length, DMA_FROM_DEVICE);
1216
1217        spin_lock_irqsave(&info->receive_queue_lock, flags);
1218        list_add_tail(&response->list, &info->receive_queue);
1219        info->count_receive_queue++;
1220        info->count_put_receive_buffer++;
1221        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1222
1223        queue_work(info->workqueue, &info->post_send_credits_work);
1224}
1225
1226/* Preallocate all receive buffer on transport establishment */
1227static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1228{
1229        int i;
1230        struct smbd_response *response;
1231
1232        INIT_LIST_HEAD(&info->reassembly_queue);
1233        spin_lock_init(&info->reassembly_queue_lock);
1234        info->reassembly_data_length = 0;
1235        info->reassembly_queue_length = 0;
1236
1237        INIT_LIST_HEAD(&info->receive_queue);
1238        spin_lock_init(&info->receive_queue_lock);
1239        info->count_receive_queue = 0;
1240
1241        INIT_LIST_HEAD(&info->empty_packet_queue);
1242        spin_lock_init(&info->empty_packet_queue_lock);
1243        info->count_empty_packet_queue = 0;
1244
1245        init_waitqueue_head(&info->wait_receive_queues);
1246
1247        for (i = 0; i < num_buf; i++) {
1248                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1249                if (!response)
1250                        goto allocate_failed;
1251
1252                response->info = info;
1253                list_add_tail(&response->list, &info->receive_queue);
1254                info->count_receive_queue++;
1255        }
1256
1257        return 0;
1258
1259allocate_failed:
1260        while (!list_empty(&info->receive_queue)) {
1261                response = list_first_entry(
1262                                &info->receive_queue,
1263                                struct smbd_response, list);
1264                list_del(&response->list);
1265                info->count_receive_queue--;
1266
1267                mempool_free(response, info->response_mempool);
1268        }
1269        return -ENOMEM;
1270}
1271
1272static void destroy_receive_buffers(struct smbd_connection *info)
1273{
1274        struct smbd_response *response;
1275
1276        while ((response = get_receive_buffer(info)))
1277                mempool_free(response, info->response_mempool);
1278
1279        while ((response = get_empty_queue_buffer(info)))
1280                mempool_free(response, info->response_mempool);
1281}
1282
1283/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1284static void idle_connection_timer(struct work_struct *work)
1285{
1286        struct smbd_connection *info = container_of(
1287                                        work, struct smbd_connection,
1288                                        idle_timer_work.work);
1289
1290        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1291                log_keep_alive(ERR,
1292                        "error status info->keep_alive_requested=%d\n",
1293                        info->keep_alive_requested);
1294                smbd_disconnect_rdma_connection(info);
1295                return;
1296        }
1297
1298        log_keep_alive(INFO, "about to send an empty idle message\n");
1299        smbd_post_send_empty(info);
1300
1301        /* Setup the next idle timeout work */
1302        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1303                        info->keep_alive_interval*HZ);
1304}
1305
1306/*
1307 * Destroy the transport and related RDMA and memory resources
1308 * Need to go through all the pending counters and make sure on one is using
1309 * the transport while it is destroyed
1310 */
1311void smbd_destroy(struct TCP_Server_Info *server)
1312{
1313        struct smbd_connection *info = server->smbd_conn;
1314        struct smbd_response *response;
1315        unsigned long flags;
1316
1317        if (!info) {
1318                log_rdma_event(INFO, "rdma session already destroyed\n");
1319                return;
1320        }
1321
1322        log_rdma_event(INFO, "destroying rdma session\n");
1323        if (info->transport_status != SMBD_DISCONNECTED) {
1324                rdma_disconnect(server->smbd_conn->id);
1325                log_rdma_event(INFO, "wait for transport being disconnected\n");
1326                wait_event_interruptible(
1327                        info->disconn_wait,
1328                        info->transport_status == SMBD_DISCONNECTED);
1329        }
1330
1331        log_rdma_event(INFO, "destroying qp\n");
1332        ib_drain_qp(info->id->qp);
1333        rdma_destroy_qp(info->id);
1334
1335        log_rdma_event(INFO, "cancelling idle timer\n");
1336        cancel_delayed_work_sync(&info->idle_timer_work);
1337
1338        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1339        wait_event(info->wait_send_pending,
1340                atomic_read(&info->send_pending) == 0);
1341
1342        /* It's not posssible for upper layer to get to reassembly */
1343        log_rdma_event(INFO, "drain the reassembly queue\n");
1344        do {
1345                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1346                response = _get_first_reassembly(info);
1347                if (response) {
1348                        list_del(&response->list);
1349                        spin_unlock_irqrestore(
1350                                &info->reassembly_queue_lock, flags);
1351                        put_receive_buffer(info, response);
1352                } else
1353                        spin_unlock_irqrestore(
1354                                &info->reassembly_queue_lock, flags);
1355        } while (response);
1356        info->reassembly_data_length = 0;
1357
1358        log_rdma_event(INFO, "free receive buffers\n");
1359        wait_event(info->wait_receive_queues,
1360                info->count_receive_queue + info->count_empty_packet_queue
1361                        == info->receive_credit_max);
1362        destroy_receive_buffers(info);
1363
1364        /*
1365         * For performance reasons, memory registration and deregistration
1366         * are not locked by srv_mutex. It is possible some processes are
1367         * blocked on transport srv_mutex while holding memory registration.
1368         * Release the transport srv_mutex to allow them to hit the failure
1369         * path when sending data, and then release memory registartions.
1370         */
1371        log_rdma_event(INFO, "freeing mr list\n");
1372        wake_up_interruptible_all(&info->wait_mr);
1373        while (atomic_read(&info->mr_used_count)) {
1374                mutex_unlock(&server->srv_mutex);
1375                msleep(1000);
1376                mutex_lock(&server->srv_mutex);
1377        }
1378        destroy_mr_list(info);
1379
1380        ib_free_cq(info->send_cq);
1381        ib_free_cq(info->recv_cq);
1382        ib_dealloc_pd(info->pd);
1383        rdma_destroy_id(info->id);
1384
1385        /* free mempools */
1386        mempool_destroy(info->request_mempool);
1387        kmem_cache_destroy(info->request_cache);
1388
1389        mempool_destroy(info->response_mempool);
1390        kmem_cache_destroy(info->response_cache);
1391
1392        info->transport_status = SMBD_DESTROYED;
1393
1394        destroy_workqueue(info->workqueue);
1395        log_rdma_event(INFO,  "rdma session destroyed\n");
1396        kfree(info);
1397}
1398
1399/*
1400 * Reconnect this SMBD connection, called from upper layer
1401 * return value: 0 on success, or actual error code
1402 */
1403int smbd_reconnect(struct TCP_Server_Info *server)
1404{
1405        log_rdma_event(INFO, "reconnecting rdma session\n");
1406
1407        if (!server->smbd_conn) {
1408                log_rdma_event(INFO, "rdma session already destroyed\n");
1409                goto create_conn;
1410        }
1411
1412        /*
1413         * This is possible if transport is disconnected and we haven't received
1414         * notification from RDMA, but upper layer has detected timeout
1415         */
1416        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1417                log_rdma_event(INFO, "disconnecting transport\n");
1418                smbd_destroy(server);
1419        }
1420
1421create_conn:
1422        log_rdma_event(INFO, "creating rdma session\n");
1423        server->smbd_conn = smbd_get_connection(
1424                server, (struct sockaddr *) &server->dstaddr);
1425
1426        if (server->smbd_conn)
1427                cifs_dbg(VFS, "RDMA transport re-established\n");
1428
1429        return server->smbd_conn ? 0 : -ENOENT;
1430}
1431
1432static void destroy_caches_and_workqueue(struct smbd_connection *info)
1433{
1434        destroy_receive_buffers(info);
1435        destroy_workqueue(info->workqueue);
1436        mempool_destroy(info->response_mempool);
1437        kmem_cache_destroy(info->response_cache);
1438        mempool_destroy(info->request_mempool);
1439        kmem_cache_destroy(info->request_cache);
1440}
1441
1442#define MAX_NAME_LEN    80
1443static int allocate_caches_and_workqueue(struct smbd_connection *info)
1444{
1445        char name[MAX_NAME_LEN];
1446        int rc;
1447
1448        scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1449        info->request_cache =
1450                kmem_cache_create(
1451                        name,
1452                        sizeof(struct smbd_request) +
1453                                sizeof(struct smbd_data_transfer),
1454                        0, SLAB_HWCACHE_ALIGN, NULL);
1455        if (!info->request_cache)
1456                return -ENOMEM;
1457
1458        info->request_mempool =
1459                mempool_create(info->send_credit_target, mempool_alloc_slab,
1460                        mempool_free_slab, info->request_cache);
1461        if (!info->request_mempool)
1462                goto out1;
1463
1464        scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1465        info->response_cache =
1466                kmem_cache_create(
1467                        name,
1468                        sizeof(struct smbd_response) +
1469                                info->max_receive_size,
1470                        0, SLAB_HWCACHE_ALIGN, NULL);
1471        if (!info->response_cache)
1472                goto out2;
1473
1474        info->response_mempool =
1475                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1476                       mempool_free_slab, info->response_cache);
1477        if (!info->response_mempool)
1478                goto out3;
1479
1480        scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1481        info->workqueue = create_workqueue(name);
1482        if (!info->workqueue)
1483                goto out4;
1484
1485        rc = allocate_receive_buffers(info, info->receive_credit_max);
1486        if (rc) {
1487                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1488                goto out5;
1489        }
1490
1491        return 0;
1492
1493out5:
1494        destroy_workqueue(info->workqueue);
1495out4:
1496        mempool_destroy(info->response_mempool);
1497out3:
1498        kmem_cache_destroy(info->response_cache);
1499out2:
1500        mempool_destroy(info->request_mempool);
1501out1:
1502        kmem_cache_destroy(info->request_cache);
1503        return -ENOMEM;
1504}
1505
1506/* Create a SMBD connection, called by upper layer */
1507static struct smbd_connection *_smbd_get_connection(
1508        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1509{
1510        int rc;
1511        struct smbd_connection *info;
1512        struct rdma_conn_param conn_param;
1513        struct ib_qp_init_attr qp_attr;
1514        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1515        struct ib_port_immutable port_immutable;
1516        u32 ird_ord_hdr[2];
1517
1518        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1519        if (!info)
1520                return NULL;
1521
1522        info->transport_status = SMBD_CONNECTING;
1523        rc = smbd_ia_open(info, dstaddr, port);
1524        if (rc) {
1525                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1526                goto create_id_failed;
1527        }
1528
1529        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1530            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1531                log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1532                               smbd_send_credit_target,
1533                               info->id->device->attrs.max_cqe,
1534                               info->id->device->attrs.max_qp_wr);
1535                goto config_failed;
1536        }
1537
1538        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1539            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1540                log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1541                               smbd_receive_credit_max,
1542                               info->id->device->attrs.max_cqe,
1543                               info->id->device->attrs.max_qp_wr);
1544                goto config_failed;
1545        }
1546
1547        info->receive_credit_max = smbd_receive_credit_max;
1548        info->send_credit_target = smbd_send_credit_target;
1549        info->max_send_size = smbd_max_send_size;
1550        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1551        info->max_receive_size = smbd_max_receive_size;
1552        info->keep_alive_interval = smbd_keep_alive_interval;
1553
1554        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1555                log_rdma_event(ERR,
1556                        "warning: device max_send_sge = %d too small\n",
1557                        info->id->device->attrs.max_send_sge);
1558                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1559        }
1560        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1561                log_rdma_event(ERR,
1562                        "warning: device max_recv_sge = %d too small\n",
1563                        info->id->device->attrs.max_recv_sge);
1564                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1565        }
1566
1567        info->send_cq = NULL;
1568        info->recv_cq = NULL;
1569        info->send_cq =
1570                ib_alloc_cq_any(info->id->device, info,
1571                                info->send_credit_target, IB_POLL_SOFTIRQ);
1572        if (IS_ERR(info->send_cq)) {
1573                info->send_cq = NULL;
1574                goto alloc_cq_failed;
1575        }
1576
1577        info->recv_cq =
1578                ib_alloc_cq_any(info->id->device, info,
1579                                info->receive_credit_max, IB_POLL_SOFTIRQ);
1580        if (IS_ERR(info->recv_cq)) {
1581                info->recv_cq = NULL;
1582                goto alloc_cq_failed;
1583        }
1584
1585        memset(&qp_attr, 0, sizeof(qp_attr));
1586        qp_attr.event_handler = smbd_qp_async_error_upcall;
1587        qp_attr.qp_context = info;
1588        qp_attr.cap.max_send_wr = info->send_credit_target;
1589        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1590        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1591        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1592        qp_attr.cap.max_inline_data = 0;
1593        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1594        qp_attr.qp_type = IB_QPT_RC;
1595        qp_attr.send_cq = info->send_cq;
1596        qp_attr.recv_cq = info->recv_cq;
1597        qp_attr.port_num = ~0;
1598
1599        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1600        if (rc) {
1601                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1602                goto create_qp_failed;
1603        }
1604
1605        memset(&conn_param, 0, sizeof(conn_param));
1606        conn_param.initiator_depth = 0;
1607
1608        conn_param.responder_resources =
1609                info->id->device->attrs.max_qp_rd_atom
1610                        < SMBD_CM_RESPONDER_RESOURCES ?
1611                info->id->device->attrs.max_qp_rd_atom :
1612                SMBD_CM_RESPONDER_RESOURCES;
1613        info->responder_resources = conn_param.responder_resources;
1614        log_rdma_mr(INFO, "responder_resources=%d\n",
1615                info->responder_resources);
1616
1617        /* Need to send IRD/ORD in private data for iWARP */
1618        info->id->device->ops.get_port_immutable(
1619                info->id->device, info->id->port_num, &port_immutable);
1620        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1621                ird_ord_hdr[0] = info->responder_resources;
1622                ird_ord_hdr[1] = 1;
1623                conn_param.private_data = ird_ord_hdr;
1624                conn_param.private_data_len = sizeof(ird_ord_hdr);
1625        } else {
1626                conn_param.private_data = NULL;
1627                conn_param.private_data_len = 0;
1628        }
1629
1630        conn_param.retry_count = SMBD_CM_RETRY;
1631        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1632        conn_param.flow_control = 0;
1633
1634        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1635                &addr_in->sin_addr, port);
1636
1637        init_waitqueue_head(&info->conn_wait);
1638        init_waitqueue_head(&info->disconn_wait);
1639        init_waitqueue_head(&info->wait_reassembly_queue);
1640        rc = rdma_connect(info->id, &conn_param);
1641        if (rc) {
1642                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1643                goto rdma_connect_failed;
1644        }
1645
1646        wait_event_interruptible(
1647                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1648
1649        if (info->transport_status != SMBD_CONNECTED) {
1650                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1651                goto rdma_connect_failed;
1652        }
1653
1654        log_rdma_event(INFO, "rdma_connect connected\n");
1655
1656        rc = allocate_caches_and_workqueue(info);
1657        if (rc) {
1658                log_rdma_event(ERR, "cache allocation failed\n");
1659                goto allocate_cache_failed;
1660        }
1661
1662        init_waitqueue_head(&info->wait_send_queue);
1663        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1664        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1665                info->keep_alive_interval*HZ);
1666
1667        init_waitqueue_head(&info->wait_send_pending);
1668        atomic_set(&info->send_pending, 0);
1669
1670        init_waitqueue_head(&info->wait_post_send);
1671
1672        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1673        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1674        info->new_credits_offered = 0;
1675        spin_lock_init(&info->lock_new_credits_offered);
1676
1677        rc = smbd_negotiate(info);
1678        if (rc) {
1679                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1680                goto negotiation_failed;
1681        }
1682
1683        rc = allocate_mr_list(info);
1684        if (rc) {
1685                log_rdma_mr(ERR, "memory registration allocation failed\n");
1686                goto allocate_mr_failed;
1687        }
1688
1689        return info;
1690
1691allocate_mr_failed:
1692        /* At this point, need to a full transport shutdown */
1693        smbd_destroy(server);
1694        return NULL;
1695
1696negotiation_failed:
1697        cancel_delayed_work_sync(&info->idle_timer_work);
1698        destroy_caches_and_workqueue(info);
1699        info->transport_status = SMBD_NEGOTIATE_FAILED;
1700        init_waitqueue_head(&info->conn_wait);
1701        rdma_disconnect(info->id);
1702        wait_event(info->conn_wait,
1703                info->transport_status == SMBD_DISCONNECTED);
1704
1705allocate_cache_failed:
1706rdma_connect_failed:
1707        rdma_destroy_qp(info->id);
1708
1709create_qp_failed:
1710alloc_cq_failed:
1711        if (info->send_cq)
1712                ib_free_cq(info->send_cq);
1713        if (info->recv_cq)
1714                ib_free_cq(info->recv_cq);
1715
1716config_failed:
1717        ib_dealloc_pd(info->pd);
1718        rdma_destroy_id(info->id);
1719
1720create_id_failed:
1721        kfree(info);
1722        return NULL;
1723}
1724
1725struct smbd_connection *smbd_get_connection(
1726        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1727{
1728        struct smbd_connection *ret;
1729        int port = SMBD_PORT;
1730
1731try_again:
1732        ret = _smbd_get_connection(server, dstaddr, port);
1733
1734        /* Try SMB_PORT if SMBD_PORT doesn't work */
1735        if (!ret && port == SMBD_PORT) {
1736                port = SMB_PORT;
1737                goto try_again;
1738        }
1739        return ret;
1740}
1741
1742/*
1743 * Receive data from receive reassembly queue
1744 * All the incoming data packets are placed in reassembly queue
1745 * buf: the buffer to read data into
1746 * size: the length of data to read
1747 * return value: actual data read
1748 * Note: this implementation copies the data from reassebmly queue to receive
1749 * buffers used by upper layer. This is not the optimal code path. A better way
1750 * to do it is to not have upper layer allocate its receive buffers but rather
1751 * borrow the buffer from reassembly queue, and return it after data is
1752 * consumed. But this will require more changes to upper layer code, and also
1753 * need to consider packet boundaries while they still being reassembled.
1754 */
1755static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1756                unsigned int size)
1757{
1758        struct smbd_response *response;
1759        struct smbd_data_transfer *data_transfer;
1760        int to_copy, to_read, data_read, offset;
1761        u32 data_length, remaining_data_length, data_offset;
1762        int rc;
1763
1764again:
1765        /*
1766         * No need to hold the reassembly queue lock all the time as we are
1767         * the only one reading from the front of the queue. The transport
1768         * may add more entries to the back of the queue at the same time
1769         */
1770        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1771                info->reassembly_data_length);
1772        if (info->reassembly_data_length >= size) {
1773                int queue_length;
1774                int queue_removed = 0;
1775
1776                /*
1777                 * Need to make sure reassembly_data_length is read before
1778                 * reading reassembly_queue_length and calling
1779                 * _get_first_reassembly. This call is lock free
1780                 * as we never read at the end of the queue which are being
1781                 * updated in SOFTIRQ as more data is received
1782                 */
1783                virt_rmb();
1784                queue_length = info->reassembly_queue_length;
1785                data_read = 0;
1786                to_read = size;
1787                offset = info->first_entry_offset;
1788                while (data_read < size) {
1789                        response = _get_first_reassembly(info);
1790                        data_transfer = smbd_response_payload(response);
1791                        data_length = le32_to_cpu(data_transfer->data_length);
1792                        remaining_data_length =
1793                                le32_to_cpu(
1794                                        data_transfer->remaining_data_length);
1795                        data_offset = le32_to_cpu(data_transfer->data_offset);
1796
1797                        /*
1798                         * The upper layer expects RFC1002 length at the
1799                         * beginning of the payload. Return it to indicate
1800                         * the total length of the packet. This minimize the
1801                         * change to upper layer packet processing logic. This
1802                         * will be eventually remove when an intermediate
1803                         * transport layer is added
1804                         */
1805                        if (response->first_segment && size == 4) {
1806                                unsigned int rfc1002_len =
1807                                        data_length + remaining_data_length;
1808                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1809                                data_read = 4;
1810                                response->first_segment = false;
1811                                log_read(INFO, "returning rfc1002 length %d\n",
1812                                        rfc1002_len);
1813                                goto read_rfc1002_done;
1814                        }
1815
1816                        to_copy = min_t(int, data_length - offset, to_read);
1817                        memcpy(
1818                                buf + data_read,
1819                                (char *)data_transfer + data_offset + offset,
1820                                to_copy);
1821
1822                        /* move on to the next buffer? */
1823                        if (to_copy == data_length - offset) {
1824                                queue_length--;
1825                                /*
1826                                 * No need to lock if we are not at the
1827                                 * end of the queue
1828                                 */
1829                                if (queue_length)
1830                                        list_del(&response->list);
1831                                else {
1832                                        spin_lock_irq(
1833                                                &info->reassembly_queue_lock);
1834                                        list_del(&response->list);
1835                                        spin_unlock_irq(
1836                                                &info->reassembly_queue_lock);
1837                                }
1838                                queue_removed++;
1839                                info->count_reassembly_queue--;
1840                                info->count_dequeue_reassembly_queue++;
1841                                put_receive_buffer(info, response);
1842                                offset = 0;
1843                                log_read(INFO, "put_receive_buffer offset=0\n");
1844                        } else
1845                                offset += to_copy;
1846
1847                        to_read -= to_copy;
1848                        data_read += to_copy;
1849
1850                        log_read(INFO, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
1851                                 to_copy, data_length - offset,
1852                                 to_read, data_read, offset);
1853                }
1854
1855                spin_lock_irq(&info->reassembly_queue_lock);
1856                info->reassembly_data_length -= data_read;
1857                info->reassembly_queue_length -= queue_removed;
1858                spin_unlock_irq(&info->reassembly_queue_lock);
1859
1860                info->first_entry_offset = offset;
1861                log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
1862                         data_read, info->reassembly_data_length,
1863                         info->first_entry_offset);
1864read_rfc1002_done:
1865                return data_read;
1866        }
1867
1868        log_read(INFO, "wait_event on more data\n");
1869        rc = wait_event_interruptible(
1870                info->wait_reassembly_queue,
1871                info->reassembly_data_length >= size ||
1872                        info->transport_status != SMBD_CONNECTED);
1873        /* Don't return any data if interrupted */
1874        if (rc)
1875                return rc;
1876
1877        if (info->transport_status != SMBD_CONNECTED) {
1878                log_read(ERR, "disconnected\n");
1879                return -ECONNABORTED;
1880        }
1881
1882        goto again;
1883}
1884
1885/*
1886 * Receive a page from receive reassembly queue
1887 * page: the page to read data into
1888 * to_read: the length of data to read
1889 * return value: actual data read
1890 */
1891static int smbd_recv_page(struct smbd_connection *info,
1892                struct page *page, unsigned int page_offset,
1893                unsigned int to_read)
1894{
1895        int ret;
1896        char *to_address;
1897        void *page_address;
1898
1899        /* make sure we have the page ready for read */
1900        ret = wait_event_interruptible(
1901                info->wait_reassembly_queue,
1902                info->reassembly_data_length >= to_read ||
1903                        info->transport_status != SMBD_CONNECTED);
1904        if (ret)
1905                return ret;
1906
1907        /* now we can read from reassembly queue and not sleep */
1908        page_address = kmap_atomic(page);
1909        to_address = (char *) page_address + page_offset;
1910
1911        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
1912                page, to_address, to_read);
1913
1914        ret = smbd_recv_buf(info, to_address, to_read);
1915        kunmap_atomic(page_address);
1916
1917        return ret;
1918}
1919
1920/*
1921 * Receive data from transport
1922 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1923 * return: total bytes read, or 0. SMB Direct will not do partial read.
1924 */
1925int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
1926{
1927        char *buf;
1928        struct page *page;
1929        unsigned int to_read, page_offset;
1930        int rc;
1931
1932        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
1933                /* It's a bug in upper layer to get there */
1934                cifs_dbg(VFS, "Invalid msg iter dir %u\n",
1935                         iov_iter_rw(&msg->msg_iter));
1936                rc = -EINVAL;
1937                goto out;
1938        }
1939
1940        switch (iov_iter_type(&msg->msg_iter)) {
1941        case ITER_KVEC:
1942                buf = msg->msg_iter.kvec->iov_base;
1943                to_read = msg->msg_iter.kvec->iov_len;
1944                rc = smbd_recv_buf(info, buf, to_read);
1945                break;
1946
1947        case ITER_BVEC:
1948                page = msg->msg_iter.bvec->bv_page;
1949                page_offset = msg->msg_iter.bvec->bv_offset;
1950                to_read = msg->msg_iter.bvec->bv_len;
1951                rc = smbd_recv_page(info, page, page_offset, to_read);
1952                break;
1953
1954        default:
1955                /* It's a bug in upper layer to get there */
1956                cifs_dbg(VFS, "Invalid msg type %d\n",
1957                         iov_iter_type(&msg->msg_iter));
1958                rc = -EINVAL;
1959        }
1960
1961out:
1962        /* SMBDirect will read it all or nothing */
1963        if (rc > 0)
1964                msg->msg_iter.count = 0;
1965        return rc;
1966}
1967
1968/*
1969 * Send data to transport
1970 * Each rqst is transported as a SMBDirect payload
1971 * rqst: the data to write
1972 * return value: 0 if successfully write, otherwise error code
1973 */
1974int smbd_send(struct TCP_Server_Info *server,
1975        int num_rqst, struct smb_rqst *rqst_array)
1976{
1977        struct smbd_connection *info = server->smbd_conn;
1978        struct kvec vec;
1979        int nvecs;
1980        int size;
1981        unsigned int buflen, remaining_data_length;
1982        int start, i, j;
1983        int max_iov_size =
1984                info->max_send_size - sizeof(struct smbd_data_transfer);
1985        struct kvec *iov;
1986        int rc;
1987        struct smb_rqst *rqst;
1988        int rqst_idx;
1989
1990        if (info->transport_status != SMBD_CONNECTED) {
1991                rc = -EAGAIN;
1992                goto done;
1993        }
1994
1995        /*
1996         * Add in the page array if there is one. The caller needs to set
1997         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
1998         * ends at page boundary
1999         */
2000        remaining_data_length = 0;
2001        for (i = 0; i < num_rqst; i++)
2002                remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2003
2004        if (remaining_data_length > info->max_fragmented_send_size) {
2005                log_write(ERR, "payload size %d > max size %d\n",
2006                        remaining_data_length, info->max_fragmented_send_size);
2007                rc = -EINVAL;
2008                goto done;
2009        }
2010
2011        log_write(INFO, "num_rqst=%d total length=%u\n",
2012                        num_rqst, remaining_data_length);
2013
2014        rqst_idx = 0;
2015next_rqst:
2016        rqst = &rqst_array[rqst_idx];
2017        iov = rqst->rq_iov;
2018
2019        cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2020                rqst_idx, smb_rqst_len(server, rqst));
2021        for (i = 0; i < rqst->rq_nvec; i++)
2022                dump_smb(iov[i].iov_base, iov[i].iov_len);
2023
2024
2025        log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
2026                  rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2027                  rqst->rq_tailsz, smb_rqst_len(server, rqst));
2028
2029        start = i = 0;
2030        buflen = 0;
2031        while (true) {
2032                buflen += iov[i].iov_len;
2033                if (buflen > max_iov_size) {
2034                        if (i > start) {
2035                                remaining_data_length -=
2036                                        (buflen-iov[i].iov_len);
2037                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2038                                          start, i, i - start,
2039                                          remaining_data_length);
2040                                rc = smbd_post_send_data(
2041                                        info, &iov[start], i-start,
2042                                        remaining_data_length);
2043                                if (rc)
2044                                        goto done;
2045                        } else {
2046                                /* iov[start] is too big, break it */
2047                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2048                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
2049                                          start, iov[start].iov_base,
2050                                          buflen, nvecs);
2051                                for (j = 0; j < nvecs; j++) {
2052                                        vec.iov_base =
2053                                                (char *)iov[start].iov_base +
2054                                                j*max_iov_size;
2055                                        vec.iov_len = max_iov_size;
2056                                        if (j == nvecs-1)
2057                                                vec.iov_len =
2058                                                        buflen -
2059                                                        max_iov_size*(nvecs-1);
2060                                        remaining_data_length -= vec.iov_len;
2061                                        log_write(INFO,
2062                                                "sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
2063                                                  j, vec.iov_base, vec.iov_len,
2064                                                  remaining_data_length);
2065                                        rc = smbd_post_send_data(
2066                                                info, &vec, 1,
2067                                                remaining_data_length);
2068                                        if (rc)
2069                                                goto done;
2070                                }
2071                                i++;
2072                                if (i == rqst->rq_nvec)
2073                                        break;
2074                        }
2075                        start = i;
2076                        buflen = 0;
2077                } else {
2078                        i++;
2079                        if (i == rqst->rq_nvec) {
2080                                /* send out all remaining vecs */
2081                                remaining_data_length -= buflen;
2082                                log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2083                                          start, i, i - start,
2084                                          remaining_data_length);
2085                                rc = smbd_post_send_data(info, &iov[start],
2086                                        i-start, remaining_data_length);
2087                                if (rc)
2088                                        goto done;
2089                                break;
2090                        }
2091                }
2092                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2093        }
2094
2095        /* now sending pages if there are any */
2096        for (i = 0; i < rqst->rq_npages; i++) {
2097                unsigned int offset;
2098
2099                rqst_page_get_length(rqst, i, &buflen, &offset);
2100                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2101                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2102                        buflen, nvecs);
2103                for (j = 0; j < nvecs; j++) {
2104                        size = max_iov_size;
2105                        if (j == nvecs-1)
2106                                size = buflen - j*max_iov_size;
2107                        remaining_data_length -= size;
2108                        log_write(INFO, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
2109                                  i, j * max_iov_size + offset, size,
2110                                  remaining_data_length);
2111                        rc = smbd_post_send_page(
2112                                info, rqst->rq_pages[i],
2113                                j*max_iov_size + offset,
2114                                size, remaining_data_length);
2115                        if (rc)
2116                                goto done;
2117                }
2118        }
2119
2120        rqst_idx++;
2121        if (rqst_idx < num_rqst)
2122                goto next_rqst;
2123
2124done:
2125        /*
2126         * As an optimization, we don't wait for individual I/O to finish
2127         * before sending the next one.
2128         * Send them all and wait for pending send count to get to 0
2129         * that means all the I/Os have been out and we are good to return
2130         */
2131
2132        wait_event(info->wait_send_pending,
2133                atomic_read(&info->send_pending) == 0);
2134
2135        return rc;
2136}
2137
2138static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2139{
2140        struct smbd_mr *mr;
2141        struct ib_cqe *cqe;
2142
2143        if (wc->status) {
2144                log_rdma_mr(ERR, "status=%d\n", wc->status);
2145                cqe = wc->wr_cqe;
2146                mr = container_of(cqe, struct smbd_mr, cqe);
2147                smbd_disconnect_rdma_connection(mr->conn);
2148        }
2149}
2150
2151/*
2152 * The work queue function that recovers MRs
2153 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2154 * again. Both calls are slow, so finish them in a workqueue. This will not
2155 * block I/O path.
2156 * There is one workqueue that recovers MRs, there is no need to lock as the
2157 * I/O requests calling smbd_register_mr will never update the links in the
2158 * mr_list.
2159 */
2160static void smbd_mr_recovery_work(struct work_struct *work)
2161{
2162        struct smbd_connection *info =
2163                container_of(work, struct smbd_connection, mr_recovery_work);
2164        struct smbd_mr *smbdirect_mr;
2165        int rc;
2166
2167        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2168                if (smbdirect_mr->state == MR_ERROR) {
2169
2170                        /* recover this MR entry */
2171                        rc = ib_dereg_mr(smbdirect_mr->mr);
2172                        if (rc) {
2173                                log_rdma_mr(ERR,
2174                                        "ib_dereg_mr failed rc=%x\n",
2175                                        rc);
2176                                smbd_disconnect_rdma_connection(info);
2177                                continue;
2178                        }
2179
2180                        smbdirect_mr->mr = ib_alloc_mr(
2181                                info->pd, info->mr_type,
2182                                info->max_frmr_depth);
2183                        if (IS_ERR(smbdirect_mr->mr)) {
2184                                log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2185                                            info->mr_type,
2186                                            info->max_frmr_depth);
2187                                smbd_disconnect_rdma_connection(info);
2188                                continue;
2189                        }
2190                } else
2191                        /* This MR is being used, don't recover it */
2192                        continue;
2193
2194                smbdirect_mr->state = MR_READY;
2195
2196                /* smbdirect_mr->state is updated by this function
2197                 * and is read and updated by I/O issuing CPUs trying
2198                 * to get a MR, the call to atomic_inc_return
2199                 * implicates a memory barrier and guarantees this
2200                 * value is updated before waking up any calls to
2201                 * get_mr() from the I/O issuing CPUs
2202                 */
2203                if (atomic_inc_return(&info->mr_ready_count) == 1)
2204                        wake_up_interruptible(&info->wait_mr);
2205        }
2206}
2207
2208static void destroy_mr_list(struct smbd_connection *info)
2209{
2210        struct smbd_mr *mr, *tmp;
2211
2212        cancel_work_sync(&info->mr_recovery_work);
2213        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2214                if (mr->state == MR_INVALIDATED)
2215                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2216                                mr->sgl_count, mr->dir);
2217                ib_dereg_mr(mr->mr);
2218                kfree(mr->sgl);
2219                kfree(mr);
2220        }
2221}
2222
2223/*
2224 * Allocate MRs used for RDMA read/write
2225 * The number of MRs will not exceed hardware capability in responder_resources
2226 * All MRs are kept in mr_list. The MR can be recovered after it's used
2227 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2228 * as MRs are used and recovered for I/O, but the list links will not change
2229 */
2230static int allocate_mr_list(struct smbd_connection *info)
2231{
2232        int i;
2233        struct smbd_mr *smbdirect_mr, *tmp;
2234
2235        INIT_LIST_HEAD(&info->mr_list);
2236        init_waitqueue_head(&info->wait_mr);
2237        spin_lock_init(&info->mr_list_lock);
2238        atomic_set(&info->mr_ready_count, 0);
2239        atomic_set(&info->mr_used_count, 0);
2240        init_waitqueue_head(&info->wait_for_mr_cleanup);
2241        /* Allocate more MRs (2x) than hardware responder_resources */
2242        for (i = 0; i < info->responder_resources * 2; i++) {
2243                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2244                if (!smbdirect_mr)
2245                        goto out;
2246                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2247                                        info->max_frmr_depth);
2248                if (IS_ERR(smbdirect_mr->mr)) {
2249                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2250                                    info->mr_type, info->max_frmr_depth);
2251                        goto out;
2252                }
2253                smbdirect_mr->sgl = kcalloc(
2254                                        info->max_frmr_depth,
2255                                        sizeof(struct scatterlist),
2256                                        GFP_KERNEL);
2257                if (!smbdirect_mr->sgl) {
2258                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2259                        ib_dereg_mr(smbdirect_mr->mr);
2260                        goto out;
2261                }
2262                smbdirect_mr->state = MR_READY;
2263                smbdirect_mr->conn = info;
2264
2265                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2266                atomic_inc(&info->mr_ready_count);
2267        }
2268        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2269        return 0;
2270
2271out:
2272        kfree(smbdirect_mr);
2273
2274        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2275                ib_dereg_mr(smbdirect_mr->mr);
2276                kfree(smbdirect_mr->sgl);
2277                kfree(smbdirect_mr);
2278        }
2279        return -ENOMEM;
2280}
2281
2282/*
2283 * Get a MR from mr_list. This function waits until there is at least one
2284 * MR available in the list. It may access the list while the
2285 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2286 * as they never modify the same places. However, there may be several CPUs
2287 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2288 * protect this situation.
2289 */
2290static struct smbd_mr *get_mr(struct smbd_connection *info)
2291{
2292        struct smbd_mr *ret;
2293        int rc;
2294again:
2295        rc = wait_event_interruptible(info->wait_mr,
2296                atomic_read(&info->mr_ready_count) ||
2297                info->transport_status != SMBD_CONNECTED);
2298        if (rc) {
2299                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2300                return NULL;
2301        }
2302
2303        if (info->transport_status != SMBD_CONNECTED) {
2304                log_rdma_mr(ERR, "info->transport_status=%x\n",
2305                        info->transport_status);
2306                return NULL;
2307        }
2308
2309        spin_lock(&info->mr_list_lock);
2310        list_for_each_entry(ret, &info->mr_list, list) {
2311                if (ret->state == MR_READY) {
2312                        ret->state = MR_REGISTERED;
2313                        spin_unlock(&info->mr_list_lock);
2314                        atomic_dec(&info->mr_ready_count);
2315                        atomic_inc(&info->mr_used_count);
2316                        return ret;
2317                }
2318        }
2319
2320        spin_unlock(&info->mr_list_lock);
2321        /*
2322         * It is possible that we could fail to get MR because other processes may
2323         * try to acquire a MR at the same time. If this is the case, retry it.
2324         */
2325        goto again;
2326}
2327
2328/*
2329 * Register memory for RDMA read/write
2330 * pages[]: the list of pages to register memory with
2331 * num_pages: the number of pages to register
2332 * tailsz: if non-zero, the bytes to register in the last page
2333 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2334 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2335 * return value: the MR registered, NULL if failed.
2336 */
2337struct smbd_mr *smbd_register_mr(
2338        struct smbd_connection *info, struct page *pages[], int num_pages,
2339        int offset, int tailsz, bool writing, bool need_invalidate)
2340{
2341        struct smbd_mr *smbdirect_mr;
2342        int rc, i;
2343        enum dma_data_direction dir;
2344        struct ib_reg_wr *reg_wr;
2345
2346        if (num_pages > info->max_frmr_depth) {
2347                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2348                        num_pages, info->max_frmr_depth);
2349                return NULL;
2350        }
2351
2352        smbdirect_mr = get_mr(info);
2353        if (!smbdirect_mr) {
2354                log_rdma_mr(ERR, "get_mr returning NULL\n");
2355                return NULL;
2356        }
2357        smbdirect_mr->need_invalidate = need_invalidate;
2358        smbdirect_mr->sgl_count = num_pages;
2359        sg_init_table(smbdirect_mr->sgl, num_pages);
2360
2361        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2362                        num_pages, offset, tailsz);
2363
2364        if (num_pages == 1) {
2365                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2366                goto skip_multiple_pages;
2367        }
2368
2369        /* We have at least two pages to register */
2370        sg_set_page(
2371                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2372        i = 1;
2373        while (i < num_pages - 1) {
2374                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2375                i++;
2376        }
2377        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2378                tailsz ? tailsz : PAGE_SIZE, 0);
2379
2380skip_multiple_pages:
2381        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2382        smbdirect_mr->dir = dir;
2383        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2384        if (!rc) {
2385                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2386                        num_pages, dir, rc);
2387                goto dma_map_error;
2388        }
2389
2390        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2391                NULL, PAGE_SIZE);
2392        if (rc != num_pages) {
2393                log_rdma_mr(ERR,
2394                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2395                        rc, num_pages);
2396                goto map_mr_error;
2397        }
2398
2399        ib_update_fast_reg_key(smbdirect_mr->mr,
2400                ib_inc_rkey(smbdirect_mr->mr->rkey));
2401        reg_wr = &smbdirect_mr->wr;
2402        reg_wr->wr.opcode = IB_WR_REG_MR;
2403        smbdirect_mr->cqe.done = register_mr_done;
2404        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2405        reg_wr->wr.num_sge = 0;
2406        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2407        reg_wr->mr = smbdirect_mr->mr;
2408        reg_wr->key = smbdirect_mr->mr->rkey;
2409        reg_wr->access = writing ?
2410                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2411                        IB_ACCESS_REMOTE_READ;
2412
2413        /*
2414         * There is no need for waiting for complemtion on ib_post_send
2415         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2416         * on the next ib_post_send when we actaully send I/O to remote peer
2417         */
2418        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2419        if (!rc)
2420                return smbdirect_mr;
2421
2422        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2423                rc, reg_wr->key);
2424
2425        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2426map_mr_error:
2427        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2428                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2429
2430dma_map_error:
2431        smbdirect_mr->state = MR_ERROR;
2432        if (atomic_dec_and_test(&info->mr_used_count))
2433                wake_up(&info->wait_for_mr_cleanup);
2434
2435        smbd_disconnect_rdma_connection(info);
2436
2437        return NULL;
2438}
2439
2440static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2441{
2442        struct smbd_mr *smbdirect_mr;
2443        struct ib_cqe *cqe;
2444
2445        cqe = wc->wr_cqe;
2446        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2447        smbdirect_mr->state = MR_INVALIDATED;
2448        if (wc->status != IB_WC_SUCCESS) {
2449                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2450                smbdirect_mr->state = MR_ERROR;
2451        }
2452        complete(&smbdirect_mr->invalidate_done);
2453}
2454
2455/*
2456 * Deregister a MR after I/O is done
2457 * This function may wait if remote invalidation is not used
2458 * and we have to locally invalidate the buffer to prevent data is being
2459 * modified by remote peer after upper layer consumes it
2460 */
2461int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2462{
2463        struct ib_send_wr *wr;
2464        struct smbd_connection *info = smbdirect_mr->conn;
2465        int rc = 0;
2466
2467        if (smbdirect_mr->need_invalidate) {
2468                /* Need to finish local invalidation before returning */
2469                wr = &smbdirect_mr->inv_wr;
2470                wr->opcode = IB_WR_LOCAL_INV;
2471                smbdirect_mr->cqe.done = local_inv_done;
2472                wr->wr_cqe = &smbdirect_mr->cqe;
2473                wr->num_sge = 0;
2474                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2475                wr->send_flags = IB_SEND_SIGNALED;
2476
2477                init_completion(&smbdirect_mr->invalidate_done);
2478                rc = ib_post_send(info->id->qp, wr, NULL);
2479                if (rc) {
2480                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2481                        smbd_disconnect_rdma_connection(info);
2482                        goto done;
2483                }
2484                wait_for_completion(&smbdirect_mr->invalidate_done);
2485                smbdirect_mr->need_invalidate = false;
2486        } else
2487                /*
2488                 * For remote invalidation, just set it to MR_INVALIDATED
2489                 * and defer to mr_recovery_work to recover the MR for next use
2490                 */
2491                smbdirect_mr->state = MR_INVALIDATED;
2492
2493        if (smbdirect_mr->state == MR_INVALIDATED) {
2494                ib_dma_unmap_sg(
2495                        info->id->device, smbdirect_mr->sgl,
2496                        smbdirect_mr->sgl_count,
2497                        smbdirect_mr->dir);
2498                smbdirect_mr->state = MR_READY;
2499                if (atomic_inc_return(&info->mr_ready_count) == 1)
2500                        wake_up_interruptible(&info->wait_mr);
2501        } else
2502                /*
2503                 * Schedule the work to do MR recovery for future I/Os MR
2504                 * recovery is slow and don't want it to block current I/O
2505                 */
2506                queue_work(info->workqueue, &info->mr_recovery_work);
2507
2508done:
2509        if (atomic_dec_and_test(&info->mr_used_count))
2510                wake_up(&info->wait_for_mr_cleanup);
2511
2512        return rc;
2513}
2514