linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15                struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17                struct smbd_connection *info);
  18static void put_receive_buffer(
  19                struct smbd_connection *info,
  20                struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25                struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27                struct smbd_connection *info,
  28                struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30                struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33                struct smbd_connection *info,
  34                struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38                struct smbd_connection *info,
  39                struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41                struct page *page, unsigned long offset,
  42                size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1 0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT        445
  52#define SMBD_PORT       5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT    5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT  120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE           128
  62#define SMBD_MIN_FRAGMENTED_SIZE        131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES     32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY                   6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY               0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING                    0x1
 115#define LOG_INCOMING                    0x2
 116#define LOG_READ                        0x4
 117#define LOG_WRITE                       0x8
 118#define LOG_RDMA_SEND                   0x10
 119#define LOG_RDMA_RECV                   0x20
 120#define LOG_KEEP_ALIVE                  0x40
 121#define LOG_RDMA_EVENT                  0x80
 122#define LOG_RDMA_MR                     0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126        "Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR             0x0
 129#define INFO            0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133        "Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)                            \
 136do {                                                                    \
 137        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 138                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144                log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160        struct smbd_connection *info =
 161                container_of(work, struct smbd_connection, disconnect_work);
 162
 163        if (info->transport_status == SMBD_CONNECTED) {
 164                info->transport_status = SMBD_DISCONNECTING;
 165                rdma_disconnect(info->id);
 166        }
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171        queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176                struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178        struct smbd_connection *info = id->context;
 179
 180        log_rdma_event(INFO, "event=%d status=%d\n",
 181                event->event, event->status);
 182
 183        switch (event->event) {
 184        case RDMA_CM_EVENT_ADDR_RESOLVED:
 185        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186                info->ri_rc = 0;
 187                complete(&info->ri_done);
 188                break;
 189
 190        case RDMA_CM_EVENT_ADDR_ERROR:
 191                info->ri_rc = -EHOSTUNREACH;
 192                complete(&info->ri_done);
 193                break;
 194
 195        case RDMA_CM_EVENT_ROUTE_ERROR:
 196                info->ri_rc = -ENETUNREACH;
 197                complete(&info->ri_done);
 198                break;
 199
 200        case RDMA_CM_EVENT_ESTABLISHED:
 201                log_rdma_event(INFO, "connected event=%d\n", event->event);
 202                info->transport_status = SMBD_CONNECTED;
 203                wake_up_interruptible(&info->conn_wait);
 204                break;
 205
 206        case RDMA_CM_EVENT_CONNECT_ERROR:
 207        case RDMA_CM_EVENT_UNREACHABLE:
 208        case RDMA_CM_EVENT_REJECTED:
 209                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210                info->transport_status = SMBD_DISCONNECTED;
 211                wake_up_interruptible(&info->conn_wait);
 212                break;
 213
 214        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215        case RDMA_CM_EVENT_DISCONNECTED:
 216                /* This happenes when we fail the negotiation */
 217                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218                        info->transport_status = SMBD_DISCONNECTED;
 219                        wake_up(&info->conn_wait);
 220                        break;
 221                }
 222
 223                info->transport_status = SMBD_DISCONNECTED;
 224                wake_up_interruptible(&info->disconn_wait);
 225                wake_up_interruptible(&info->wait_reassembly_queue);
 226                wake_up_interruptible_all(&info->wait_send_queue);
 227                break;
 228
 229        default:
 230                break;
 231        }
 232
 233        return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240        struct smbd_connection *info = context;
 241
 242        log_rdma_event(ERR, "%s on device %s info %p\n",
 243                ib_event_msg(event->event), event->device->name, info);
 244
 245        switch (event->event) {
 246        case IB_EVENT_CQ_ERR:
 247        case IB_EVENT_QP_FATAL:
 248                smbd_disconnect_rdma_connection(info);
 249
 250        default:
 251                break;
 252        }
 253}
 254
 255static inline void *smbd_request_payload(struct smbd_request *request)
 256{
 257        return (void *)request->packet;
 258}
 259
 260static inline void *smbd_response_payload(struct smbd_response *response)
 261{
 262        return (void *)response->packet;
 263}
 264
 265/* Called when a RDMA send is done */
 266static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 267{
 268        int i;
 269        struct smbd_request *request =
 270                container_of(wc->wr_cqe, struct smbd_request, cqe);
 271
 272        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 273                request, wc->status);
 274
 275        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 276                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 277                        wc->status, wc->opcode);
 278                smbd_disconnect_rdma_connection(request->info);
 279        }
 280
 281        for (i = 0; i < request->num_sge; i++)
 282                ib_dma_unmap_single(request->info->id->device,
 283                        request->sge[i].addr,
 284                        request->sge[i].length,
 285                        DMA_TO_DEVICE);
 286
 287        if (request->has_payload) {
 288                if (atomic_dec_and_test(&request->info->send_payload_pending))
 289                        wake_up(&request->info->wait_send_payload_pending);
 290        } else {
 291                if (atomic_dec_and_test(&request->info->send_pending))
 292                        wake_up(&request->info->wait_send_pending);
 293        }
 294
 295        mempool_free(request, request->info->request_mempool);
 296}
 297
 298static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 299{
 300        log_rdma_event(INFO, "resp message min_version %u max_version %u "
 301                "negotiated_version %u credits_requested %u "
 302                "credits_granted %u status %u max_readwrite_size %u "
 303                "preferred_send_size %u max_receive_size %u "
 304                "max_fragmented_size %u\n",
 305                resp->min_version, resp->max_version, resp->negotiated_version,
 306                resp->credits_requested, resp->credits_granted, resp->status,
 307                resp->max_readwrite_size, resp->preferred_send_size,
 308                resp->max_receive_size, resp->max_fragmented_size);
 309}
 310
 311/*
 312 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 313 * response, packet_length: the negotiation response message
 314 * return value: true if negotiation is a success, false if failed
 315 */
 316static bool process_negotiation_response(
 317                struct smbd_response *response, int packet_length)
 318{
 319        struct smbd_connection *info = response->info;
 320        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 321
 322        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 323                log_rdma_event(ERR,
 324                        "error: packet_length=%d\n", packet_length);
 325                return false;
 326        }
 327
 328        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 329                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 330                        le16_to_cpu(packet->negotiated_version));
 331                return false;
 332        }
 333        info->protocol = le16_to_cpu(packet->negotiated_version);
 334
 335        if (packet->credits_requested == 0) {
 336                log_rdma_event(ERR, "error: credits_requested==0\n");
 337                return false;
 338        }
 339        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 340
 341        if (packet->credits_granted == 0) {
 342                log_rdma_event(ERR, "error: credits_granted==0\n");
 343                return false;
 344        }
 345        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 346
 347        atomic_set(&info->receive_credits, 0);
 348
 349        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 350                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 351                        le32_to_cpu(packet->preferred_send_size));
 352                return false;
 353        }
 354        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 355
 356        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 357                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 358                        le32_to_cpu(packet->max_receive_size));
 359                return false;
 360        }
 361        info->max_send_size = min_t(int, info->max_send_size,
 362                                        le32_to_cpu(packet->max_receive_size));
 363
 364        if (le32_to_cpu(packet->max_fragmented_size) <
 365                        SMBD_MIN_FRAGMENTED_SIZE) {
 366                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 367                        le32_to_cpu(packet->max_fragmented_size));
 368                return false;
 369        }
 370        info->max_fragmented_send_size =
 371                le32_to_cpu(packet->max_fragmented_size);
 372        info->rdma_readwrite_threshold =
 373                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 374                info->max_fragmented_send_size :
 375                rdma_readwrite_threshold;
 376
 377
 378        info->max_readwrite_size = min_t(u32,
 379                        le32_to_cpu(packet->max_readwrite_size),
 380                        info->max_frmr_depth * PAGE_SIZE);
 381        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 382
 383        return true;
 384}
 385
 386/*
 387 * Check and schedule to send an immediate packet
 388 * This is used to extend credtis to remote peer to keep the transport busy
 389 */
 390static void check_and_send_immediate(struct smbd_connection *info)
 391{
 392        if (info->transport_status != SMBD_CONNECTED)
 393                return;
 394
 395        info->send_immediate = true;
 396
 397        /*
 398         * Promptly send a packet if our peer is running low on receive
 399         * credits
 400         */
 401        if (atomic_read(&info->receive_credits) <
 402                info->receive_credit_target - 1)
 403                queue_delayed_work(
 404                        info->workqueue, &info->send_immediate_work, 0);
 405}
 406
 407static void smbd_post_send_credits(struct work_struct *work)
 408{
 409        int ret = 0;
 410        int use_receive_queue = 1;
 411        int rc;
 412        struct smbd_response *response;
 413        struct smbd_connection *info =
 414                container_of(work, struct smbd_connection,
 415                        post_send_credits_work);
 416
 417        if (info->transport_status != SMBD_CONNECTED) {
 418                wake_up(&info->wait_receive_queues);
 419                return;
 420        }
 421
 422        if (info->receive_credit_target >
 423                atomic_read(&info->receive_credits)) {
 424                while (true) {
 425                        if (use_receive_queue)
 426                                response = get_receive_buffer(info);
 427                        else
 428                                response = get_empty_queue_buffer(info);
 429                        if (!response) {
 430                                /* now switch to emtpy packet queue */
 431                                if (use_receive_queue) {
 432                                        use_receive_queue = 0;
 433                                        continue;
 434                                } else
 435                                        break;
 436                        }
 437
 438                        response->type = SMBD_TRANSFER_DATA;
 439                        response->first_segment = false;
 440                        rc = smbd_post_recv(info, response);
 441                        if (rc) {
 442                                log_rdma_recv(ERR,
 443                                        "post_recv failed rc=%d\n", rc);
 444                                put_receive_buffer(info, response);
 445                                break;
 446                        }
 447
 448                        ret++;
 449                }
 450        }
 451
 452        spin_lock(&info->lock_new_credits_offered);
 453        info->new_credits_offered += ret;
 454        spin_unlock(&info->lock_new_credits_offered);
 455
 456        atomic_add(ret, &info->receive_credits);
 457
 458        /* Check if we can post new receive and grant credits to peer */
 459        check_and_send_immediate(info);
 460}
 461
 462static void smbd_recv_done_work(struct work_struct *work)
 463{
 464        struct smbd_connection *info =
 465                container_of(work, struct smbd_connection, recv_done_work);
 466
 467        /*
 468         * We may have new send credits granted from remote peer
 469         * If any sender is blcoked on lack of credets, unblock it
 470         */
 471        if (atomic_read(&info->send_credits))
 472                wake_up_interruptible(&info->wait_send_queue);
 473
 474        /*
 475         * Check if we need to send something to remote peer to
 476         * grant more credits or respond to KEEP_ALIVE packet
 477         */
 478        check_and_send_immediate(info);
 479}
 480
 481/* Called from softirq, when recv is done */
 482static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 483{
 484        struct smbd_data_transfer *data_transfer;
 485        struct smbd_response *response =
 486                container_of(wc->wr_cqe, struct smbd_response, cqe);
 487        struct smbd_connection *info = response->info;
 488        int data_length = 0;
 489
 490        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d "
 491                      "byte_len=%d pkey_index=%x\n",
 492                response, response->type, wc->status, wc->opcode,
 493                wc->byte_len, wc->pkey_index);
 494
 495        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 496                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 497                        wc->status, wc->opcode);
 498                smbd_disconnect_rdma_connection(info);
 499                goto error;
 500        }
 501
 502        ib_dma_sync_single_for_cpu(
 503                wc->qp->device,
 504                response->sge.addr,
 505                response->sge.length,
 506                DMA_FROM_DEVICE);
 507
 508        switch (response->type) {
 509        /* SMBD negotiation response */
 510        case SMBD_NEGOTIATE_RESP:
 511                dump_smbd_negotiate_resp(smbd_response_payload(response));
 512                info->full_packet_received = true;
 513                info->negotiate_done =
 514                        process_negotiation_response(response, wc->byte_len);
 515                complete(&info->negotiate_completion);
 516                break;
 517
 518        /* SMBD data transfer packet */
 519        case SMBD_TRANSFER_DATA:
 520                data_transfer = smbd_response_payload(response);
 521                data_length = le32_to_cpu(data_transfer->data_length);
 522
 523                /*
 524                 * If this is a packet with data playload place the data in
 525                 * reassembly queue and wake up the reading thread
 526                 */
 527                if (data_length) {
 528                        if (info->full_packet_received)
 529                                response->first_segment = true;
 530
 531                        if (le32_to_cpu(data_transfer->remaining_data_length))
 532                                info->full_packet_received = false;
 533                        else
 534                                info->full_packet_received = true;
 535
 536                        enqueue_reassembly(
 537                                info,
 538                                response,
 539                                data_length);
 540                } else
 541                        put_empty_packet(info, response);
 542
 543                if (data_length)
 544                        wake_up_interruptible(&info->wait_reassembly_queue);
 545
 546                atomic_dec(&info->receive_credits);
 547                info->receive_credit_target =
 548                        le16_to_cpu(data_transfer->credits_requested);
 549                atomic_add(le16_to_cpu(data_transfer->credits_granted),
 550                        &info->send_credits);
 551
 552                log_incoming(INFO, "data flags %d data_offset %d "
 553                        "data_length %d remaining_data_length %d\n",
 554                        le16_to_cpu(data_transfer->flags),
 555                        le32_to_cpu(data_transfer->data_offset),
 556                        le32_to_cpu(data_transfer->data_length),
 557                        le32_to_cpu(data_transfer->remaining_data_length));
 558
 559                /* Send a KEEP_ALIVE response right away if requested */
 560                info->keep_alive_requested = KEEP_ALIVE_NONE;
 561                if (le16_to_cpu(data_transfer->flags) &
 562                                SMB_DIRECT_RESPONSE_REQUESTED) {
 563                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 564                }
 565
 566                queue_work(info->workqueue, &info->recv_done_work);
 567                return;
 568
 569        default:
 570                log_rdma_recv(ERR,
 571                        "unexpected response type=%d\n", response->type);
 572        }
 573
 574error:
 575        put_receive_buffer(info, response);
 576}
 577
 578static struct rdma_cm_id *smbd_create_id(
 579                struct smbd_connection *info,
 580                struct sockaddr *dstaddr, int port)
 581{
 582        struct rdma_cm_id *id;
 583        int rc;
 584        __be16 *sport;
 585
 586        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 587                RDMA_PS_TCP, IB_QPT_RC);
 588        if (IS_ERR(id)) {
 589                rc = PTR_ERR(id);
 590                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 591                return id;
 592        }
 593
 594        if (dstaddr->sa_family == AF_INET6)
 595                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 596        else
 597                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 598
 599        *sport = htons(port);
 600
 601        init_completion(&info->ri_done);
 602        info->ri_rc = -ETIMEDOUT;
 603
 604        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 605                RDMA_RESOLVE_TIMEOUT);
 606        if (rc) {
 607                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 608                goto out;
 609        }
 610        wait_for_completion_interruptible_timeout(
 611                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 612        rc = info->ri_rc;
 613        if (rc) {
 614                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 615                goto out;
 616        }
 617
 618        info->ri_rc = -ETIMEDOUT;
 619        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 620        if (rc) {
 621                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 622                goto out;
 623        }
 624        wait_for_completion_interruptible_timeout(
 625                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 626        rc = info->ri_rc;
 627        if (rc) {
 628                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 629                goto out;
 630        }
 631
 632        return id;
 633
 634out:
 635        rdma_destroy_id(id);
 636        return ERR_PTR(rc);
 637}
 638
 639/*
 640 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 641 * This implementation requries FRWR on RDMA read/write
 642 * return value: true if it is supported
 643 */
 644static bool frwr_is_supported(struct ib_device_attr *attrs)
 645{
 646        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 647                return false;
 648        if (attrs->max_fast_reg_page_list_len == 0)
 649                return false;
 650        return true;
 651}
 652
 653static int smbd_ia_open(
 654                struct smbd_connection *info,
 655                struct sockaddr *dstaddr, int port)
 656{
 657        int rc;
 658
 659        info->id = smbd_create_id(info, dstaddr, port);
 660        if (IS_ERR(info->id)) {
 661                rc = PTR_ERR(info->id);
 662                goto out1;
 663        }
 664
 665        if (!frwr_is_supported(&info->id->device->attrs)) {
 666                log_rdma_event(ERR,
 667                        "Fast Registration Work Requests "
 668                        "(FRWR) is not supported\n");
 669                log_rdma_event(ERR,
 670                        "Device capability flags = %llx "
 671                        "max_fast_reg_page_list_len = %u\n",
 672                        info->id->device->attrs.device_cap_flags,
 673                        info->id->device->attrs.max_fast_reg_page_list_len);
 674                rc = -EPROTONOSUPPORT;
 675                goto out2;
 676        }
 677        info->max_frmr_depth = min_t(int,
 678                smbd_max_frmr_depth,
 679                info->id->device->attrs.max_fast_reg_page_list_len);
 680        info->mr_type = IB_MR_TYPE_MEM_REG;
 681        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 682                info->mr_type = IB_MR_TYPE_SG_GAPS;
 683
 684        info->pd = ib_alloc_pd(info->id->device, 0);
 685        if (IS_ERR(info->pd)) {
 686                rc = PTR_ERR(info->pd);
 687                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 688                goto out2;
 689        }
 690
 691        return 0;
 692
 693out2:
 694        rdma_destroy_id(info->id);
 695        info->id = NULL;
 696
 697out1:
 698        return rc;
 699}
 700
 701/*
 702 * Send a negotiation request message to the peer
 703 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 704 * After negotiation, the transport is connected and ready for
 705 * carrying upper layer SMB payload
 706 */
 707static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 708{
 709        struct ib_send_wr send_wr;
 710        int rc = -ENOMEM;
 711        struct smbd_request *request;
 712        struct smbd_negotiate_req *packet;
 713
 714        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 715        if (!request)
 716                return rc;
 717
 718        request->info = info;
 719
 720        packet = smbd_request_payload(request);
 721        packet->min_version = cpu_to_le16(SMBD_V1);
 722        packet->max_version = cpu_to_le16(SMBD_V1);
 723        packet->reserved = 0;
 724        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 725        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 726        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 727        packet->max_fragmented_size =
 728                cpu_to_le32(info->max_fragmented_recv_size);
 729
 730        request->num_sge = 1;
 731        request->sge[0].addr = ib_dma_map_single(
 732                                info->id->device, (void *)packet,
 733                                sizeof(*packet), DMA_TO_DEVICE);
 734        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 735                rc = -EIO;
 736                goto dma_mapping_failed;
 737        }
 738
 739        request->sge[0].length = sizeof(*packet);
 740        request->sge[0].lkey = info->pd->local_dma_lkey;
 741
 742        ib_dma_sync_single_for_device(
 743                info->id->device, request->sge[0].addr,
 744                request->sge[0].length, DMA_TO_DEVICE);
 745
 746        request->cqe.done = send_done;
 747
 748        send_wr.next = NULL;
 749        send_wr.wr_cqe = &request->cqe;
 750        send_wr.sg_list = request->sge;
 751        send_wr.num_sge = request->num_sge;
 752        send_wr.opcode = IB_WR_SEND;
 753        send_wr.send_flags = IB_SEND_SIGNALED;
 754
 755        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 756                request->sge[0].addr,
 757                request->sge[0].length, request->sge[0].lkey);
 758
 759        request->has_payload = false;
 760        atomic_inc(&info->send_pending);
 761        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 762        if (!rc)
 763                return 0;
 764
 765        /* if we reach here, post send failed */
 766        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 767        atomic_dec(&info->send_pending);
 768        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 769                request->sge[0].length, DMA_TO_DEVICE);
 770
 771        smbd_disconnect_rdma_connection(info);
 772
 773dma_mapping_failed:
 774        mempool_free(request, info->request_mempool);
 775        return rc;
 776}
 777
 778/*
 779 * Extend the credits to remote peer
 780 * This implements [MS-SMBD] 3.1.5.9
 781 * The idea is that we should extend credits to remote peer as quickly as
 782 * it's allowed, to maintain data flow. We allocate as much receive
 783 * buffer as possible, and extend the receive credits to remote peer
 784 * return value: the new credtis being granted.
 785 */
 786static int manage_credits_prior_sending(struct smbd_connection *info)
 787{
 788        int new_credits;
 789
 790        spin_lock(&info->lock_new_credits_offered);
 791        new_credits = info->new_credits_offered;
 792        info->new_credits_offered = 0;
 793        spin_unlock(&info->lock_new_credits_offered);
 794
 795        return new_credits;
 796}
 797
 798/*
 799 * Check if we need to send a KEEP_ALIVE message
 800 * The idle connection timer triggers a KEEP_ALIVE message when expires
 801 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 802 * back a response.
 803 * return value:
 804 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 805 * 0: otherwise
 806 */
 807static int manage_keep_alive_before_sending(struct smbd_connection *info)
 808{
 809        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 810                info->keep_alive_requested = KEEP_ALIVE_SENT;
 811                return 1;
 812        }
 813        return 0;
 814}
 815
 816/*
 817 * Build and prepare the SMBD packet header
 818 * This function waits for avaialbe send credits and build a SMBD packet
 819 * header. The caller then optional append payload to the packet after
 820 * the header
 821 * intput values
 822 * size: the size of the payload
 823 * remaining_data_length: remaining data to send if this is part of a
 824 * fragmented packet
 825 * output values
 826 * request_out: the request allocated from this function
 827 * return values: 0 on success, otherwise actual error code returned
 828 */
 829static int smbd_create_header(struct smbd_connection *info,
 830                int size, int remaining_data_length,
 831                struct smbd_request **request_out)
 832{
 833        struct smbd_request *request;
 834        struct smbd_data_transfer *packet;
 835        int header_length;
 836        int rc;
 837
 838        /* Wait for send credits. A SMBD packet needs one credit */
 839        rc = wait_event_interruptible(info->wait_send_queue,
 840                atomic_read(&info->send_credits) > 0 ||
 841                info->transport_status != SMBD_CONNECTED);
 842        if (rc)
 843                return rc;
 844
 845        if (info->transport_status != SMBD_CONNECTED) {
 846                log_outgoing(ERR, "disconnected not sending\n");
 847                return -EAGAIN;
 848        }
 849        atomic_dec(&info->send_credits);
 850
 851        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 852        if (!request) {
 853                rc = -ENOMEM;
 854                goto err;
 855        }
 856
 857        request->info = info;
 858
 859        /* Fill in the packet header */
 860        packet = smbd_request_payload(request);
 861        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 862        packet->credits_granted =
 863                cpu_to_le16(manage_credits_prior_sending(info));
 864        info->send_immediate = false;
 865
 866        packet->flags = 0;
 867        if (manage_keep_alive_before_sending(info))
 868                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 869
 870        packet->reserved = 0;
 871        if (!size)
 872                packet->data_offset = 0;
 873        else
 874                packet->data_offset = cpu_to_le32(24);
 875        packet->data_length = cpu_to_le32(size);
 876        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 877        packet->padding = 0;
 878
 879        log_outgoing(INFO, "credits_requested=%d credits_granted=%d "
 880                "data_offset=%d data_length=%d remaining_data_length=%d\n",
 881                le16_to_cpu(packet->credits_requested),
 882                le16_to_cpu(packet->credits_granted),
 883                le32_to_cpu(packet->data_offset),
 884                le32_to_cpu(packet->data_length),
 885                le32_to_cpu(packet->remaining_data_length));
 886
 887        /* Map the packet to DMA */
 888        header_length = sizeof(struct smbd_data_transfer);
 889        /* If this is a packet without payload, don't send padding */
 890        if (!size)
 891                header_length = offsetof(struct smbd_data_transfer, padding);
 892
 893        request->num_sge = 1;
 894        request->sge[0].addr = ib_dma_map_single(info->id->device,
 895                                                 (void *)packet,
 896                                                 header_length,
 897                                                 DMA_TO_DEVICE);
 898        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 899                mempool_free(request, info->request_mempool);
 900                rc = -EIO;
 901                goto err;
 902        }
 903
 904        request->sge[0].length = header_length;
 905        request->sge[0].lkey = info->pd->local_dma_lkey;
 906
 907        *request_out = request;
 908        return 0;
 909
 910err:
 911        atomic_inc(&info->send_credits);
 912        return rc;
 913}
 914
 915static void smbd_destroy_header(struct smbd_connection *info,
 916                struct smbd_request *request)
 917{
 918
 919        ib_dma_unmap_single(info->id->device,
 920                            request->sge[0].addr,
 921                            request->sge[0].length,
 922                            DMA_TO_DEVICE);
 923        mempool_free(request, info->request_mempool);
 924        atomic_inc(&info->send_credits);
 925}
 926
 927/* Post the send request */
 928static int smbd_post_send(struct smbd_connection *info,
 929                struct smbd_request *request, bool has_payload)
 930{
 931        struct ib_send_wr send_wr;
 932        int rc, i;
 933
 934        for (i = 0; i < request->num_sge; i++) {
 935                log_rdma_send(INFO,
 936                        "rdma_request sge[%d] addr=%llu length=%u\n",
 937                        i, request->sge[i].addr, request->sge[i].length);
 938                ib_dma_sync_single_for_device(
 939                        info->id->device,
 940                        request->sge[i].addr,
 941                        request->sge[i].length,
 942                        DMA_TO_DEVICE);
 943        }
 944
 945        request->cqe.done = send_done;
 946
 947        send_wr.next = NULL;
 948        send_wr.wr_cqe = &request->cqe;
 949        send_wr.sg_list = request->sge;
 950        send_wr.num_sge = request->num_sge;
 951        send_wr.opcode = IB_WR_SEND;
 952        send_wr.send_flags = IB_SEND_SIGNALED;
 953
 954        if (has_payload) {
 955                request->has_payload = true;
 956                atomic_inc(&info->send_payload_pending);
 957        } else {
 958                request->has_payload = false;
 959                atomic_inc(&info->send_pending);
 960        }
 961
 962        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 963        if (rc) {
 964                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 965                if (has_payload) {
 966                        if (atomic_dec_and_test(&info->send_payload_pending))
 967                                wake_up(&info->wait_send_payload_pending);
 968                } else {
 969                        if (atomic_dec_and_test(&info->send_pending))
 970                                wake_up(&info->wait_send_pending);
 971                }
 972                smbd_disconnect_rdma_connection(info);
 973                rc = -EAGAIN;
 974        } else
 975                /* Reset timer for idle connection after packet is sent */
 976                mod_delayed_work(info->workqueue, &info->idle_timer_work,
 977                        info->keep_alive_interval*HZ);
 978
 979        return rc;
 980}
 981
 982static int smbd_post_send_sgl(struct smbd_connection *info,
 983        struct scatterlist *sgl, int data_length, int remaining_data_length)
 984{
 985        int num_sgs;
 986        int i, rc;
 987        struct smbd_request *request;
 988        struct scatterlist *sg;
 989
 990        rc = smbd_create_header(
 991                info, data_length, remaining_data_length, &request);
 992        if (rc)
 993                return rc;
 994
 995        num_sgs = sgl ? sg_nents(sgl) : 0;
 996        for_each_sg(sgl, sg, num_sgs, i) {
 997                request->sge[i+1].addr =
 998                        ib_dma_map_page(info->id->device, sg_page(sg),
 999                               sg->offset, sg->length, DMA_TO_DEVICE);
1000                if (ib_dma_mapping_error(
1001                                info->id->device, request->sge[i+1].addr)) {
1002                        rc = -EIO;
1003                        request->sge[i+1].addr = 0;
1004                        goto dma_mapping_failure;
1005                }
1006                request->sge[i+1].length = sg->length;
1007                request->sge[i+1].lkey = info->pd->local_dma_lkey;
1008                request->num_sge++;
1009        }
1010
1011        rc = smbd_post_send(info, request, data_length);
1012        if (!rc)
1013                return 0;
1014
1015dma_mapping_failure:
1016        for (i = 1; i < request->num_sge; i++)
1017                if (request->sge[i].addr)
1018                        ib_dma_unmap_single(info->id->device,
1019                                            request->sge[i].addr,
1020                                            request->sge[i].length,
1021                                            DMA_TO_DEVICE);
1022        smbd_destroy_header(info, request);
1023        return rc;
1024}
1025
1026/*
1027 * Send a page
1028 * page: the page to send
1029 * offset: offset in the page to send
1030 * size: length in the page to send
1031 * remaining_data_length: remaining data to send in this payload
1032 */
1033static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
1034                unsigned long offset, size_t size, int remaining_data_length)
1035{
1036        struct scatterlist sgl;
1037
1038        sg_init_table(&sgl, 1);
1039        sg_set_page(&sgl, page, size, offset);
1040
1041        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
1042}
1043
1044/*
1045 * Send an empty message
1046 * Empty message is used to extend credits to peer to for keep live
1047 * while there is no upper layer payload to send at the time
1048 */
1049static int smbd_post_send_empty(struct smbd_connection *info)
1050{
1051        info->count_send_empty++;
1052        return smbd_post_send_sgl(info, NULL, 0, 0);
1053}
1054
1055/*
1056 * Send a data buffer
1057 * iov: the iov array describing the data buffers
1058 * n_vec: number of iov array
1059 * remaining_data_length: remaining data to send following this packet
1060 * in segmented SMBD packet
1061 */
1062static int smbd_post_send_data(
1063        struct smbd_connection *info, struct kvec *iov, int n_vec,
1064        int remaining_data_length)
1065{
1066        int i;
1067        u32 data_length = 0;
1068        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1069
1070        if (n_vec > SMBDIRECT_MAX_SGE) {
1071                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1072                return -ENOMEM;
1073        }
1074
1075        sg_init_table(sgl, n_vec);
1076        for (i = 0; i < n_vec; i++) {
1077                data_length += iov[i].iov_len;
1078                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1079        }
1080
1081        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1082}
1083
1084/*
1085 * Post a receive request to the transport
1086 * The remote peer can only send data when a receive request is posted
1087 * The interaction is controlled by send/receive credit system
1088 */
1089static int smbd_post_recv(
1090                struct smbd_connection *info, struct smbd_response *response)
1091{
1092        struct ib_recv_wr recv_wr;
1093        int rc = -EIO;
1094
1095        response->sge.addr = ib_dma_map_single(
1096                                info->id->device, response->packet,
1097                                info->max_receive_size, DMA_FROM_DEVICE);
1098        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1099                return rc;
1100
1101        response->sge.length = info->max_receive_size;
1102        response->sge.lkey = info->pd->local_dma_lkey;
1103
1104        response->cqe.done = recv_done;
1105
1106        recv_wr.wr_cqe = &response->cqe;
1107        recv_wr.next = NULL;
1108        recv_wr.sg_list = &response->sge;
1109        recv_wr.num_sge = 1;
1110
1111        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1112        if (rc) {
1113                ib_dma_unmap_single(info->id->device, response->sge.addr,
1114                                    response->sge.length, DMA_FROM_DEVICE);
1115                smbd_disconnect_rdma_connection(info);
1116                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1117        }
1118
1119        return rc;
1120}
1121
1122/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1123static int smbd_negotiate(struct smbd_connection *info)
1124{
1125        int rc;
1126        struct smbd_response *response = get_receive_buffer(info);
1127
1128        response->type = SMBD_NEGOTIATE_RESP;
1129        rc = smbd_post_recv(info, response);
1130        log_rdma_event(INFO,
1131                "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x "
1132                "iov.lkey=%x\n",
1133                rc, response->sge.addr,
1134                response->sge.length, response->sge.lkey);
1135        if (rc)
1136                return rc;
1137
1138        init_completion(&info->negotiate_completion);
1139        info->negotiate_done = false;
1140        rc = smbd_post_send_negotiate_req(info);
1141        if (rc)
1142                return rc;
1143
1144        rc = wait_for_completion_interruptible_timeout(
1145                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1146        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1147
1148        if (info->negotiate_done)
1149                return 0;
1150
1151        if (rc == 0)
1152                rc = -ETIMEDOUT;
1153        else if (rc == -ERESTARTSYS)
1154                rc = -EINTR;
1155        else
1156                rc = -ENOTCONN;
1157
1158        return rc;
1159}
1160
1161static void put_empty_packet(
1162                struct smbd_connection *info, struct smbd_response *response)
1163{
1164        spin_lock(&info->empty_packet_queue_lock);
1165        list_add_tail(&response->list, &info->empty_packet_queue);
1166        info->count_empty_packet_queue++;
1167        spin_unlock(&info->empty_packet_queue_lock);
1168
1169        queue_work(info->workqueue, &info->post_send_credits_work);
1170}
1171
1172/*
1173 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1174 * This is a queue for reassembling upper layer payload and present to upper
1175 * layer. All the inncoming payload go to the reassembly queue, regardless of
1176 * if reassembly is required. The uuper layer code reads from the queue for all
1177 * incoming payloads.
1178 * Put a received packet to the reassembly queue
1179 * response: the packet received
1180 * data_length: the size of payload in this packet
1181 */
1182static void enqueue_reassembly(
1183        struct smbd_connection *info,
1184        struct smbd_response *response,
1185        int data_length)
1186{
1187        spin_lock(&info->reassembly_queue_lock);
1188        list_add_tail(&response->list, &info->reassembly_queue);
1189        info->reassembly_queue_length++;
1190        /*
1191         * Make sure reassembly_data_length is updated after list and
1192         * reassembly_queue_length are updated. On the dequeue side
1193         * reassembly_data_length is checked without a lock to determine
1194         * if reassembly_queue_length and list is up to date
1195         */
1196        virt_wmb();
1197        info->reassembly_data_length += data_length;
1198        spin_unlock(&info->reassembly_queue_lock);
1199        info->count_reassembly_queue++;
1200        info->count_enqueue_reassembly_queue++;
1201}
1202
1203/*
1204 * Get the first entry at the front of reassembly queue
1205 * Caller is responsible for locking
1206 * return value: the first entry if any, NULL if queue is empty
1207 */
1208static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1209{
1210        struct smbd_response *ret = NULL;
1211
1212        if (!list_empty(&info->reassembly_queue)) {
1213                ret = list_first_entry(
1214                        &info->reassembly_queue,
1215                        struct smbd_response, list);
1216        }
1217        return ret;
1218}
1219
1220static struct smbd_response *get_empty_queue_buffer(
1221                struct smbd_connection *info)
1222{
1223        struct smbd_response *ret = NULL;
1224        unsigned long flags;
1225
1226        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1227        if (!list_empty(&info->empty_packet_queue)) {
1228                ret = list_first_entry(
1229                        &info->empty_packet_queue,
1230                        struct smbd_response, list);
1231                list_del(&ret->list);
1232                info->count_empty_packet_queue--;
1233        }
1234        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1235
1236        return ret;
1237}
1238
1239/*
1240 * Get a receive buffer
1241 * For each remote send, we need to post a receive. The receive buffers are
1242 * pre-allocated in advance.
1243 * return value: the receive buffer, NULL if none is available
1244 */
1245static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1246{
1247        struct smbd_response *ret = NULL;
1248        unsigned long flags;
1249
1250        spin_lock_irqsave(&info->receive_queue_lock, flags);
1251        if (!list_empty(&info->receive_queue)) {
1252                ret = list_first_entry(
1253                        &info->receive_queue,
1254                        struct smbd_response, list);
1255                list_del(&ret->list);
1256                info->count_receive_queue--;
1257                info->count_get_receive_buffer++;
1258        }
1259        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1260
1261        return ret;
1262}
1263
1264/*
1265 * Return a receive buffer
1266 * Upon returning of a receive buffer, we can post new receive and extend
1267 * more receive credits to remote peer. This is done immediately after a
1268 * receive buffer is returned.
1269 */
1270static void put_receive_buffer(
1271        struct smbd_connection *info, struct smbd_response *response)
1272{
1273        unsigned long flags;
1274
1275        ib_dma_unmap_single(info->id->device, response->sge.addr,
1276                response->sge.length, DMA_FROM_DEVICE);
1277
1278        spin_lock_irqsave(&info->receive_queue_lock, flags);
1279        list_add_tail(&response->list, &info->receive_queue);
1280        info->count_receive_queue++;
1281        info->count_put_receive_buffer++;
1282        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1283
1284        queue_work(info->workqueue, &info->post_send_credits_work);
1285}
1286
1287/* Preallocate all receive buffer on transport establishment */
1288static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1289{
1290        int i;
1291        struct smbd_response *response;
1292
1293        INIT_LIST_HEAD(&info->reassembly_queue);
1294        spin_lock_init(&info->reassembly_queue_lock);
1295        info->reassembly_data_length = 0;
1296        info->reassembly_queue_length = 0;
1297
1298        INIT_LIST_HEAD(&info->receive_queue);
1299        spin_lock_init(&info->receive_queue_lock);
1300        info->count_receive_queue = 0;
1301
1302        INIT_LIST_HEAD(&info->empty_packet_queue);
1303        spin_lock_init(&info->empty_packet_queue_lock);
1304        info->count_empty_packet_queue = 0;
1305
1306        init_waitqueue_head(&info->wait_receive_queues);
1307
1308        for (i = 0; i < num_buf; i++) {
1309                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1310                if (!response)
1311                        goto allocate_failed;
1312
1313                response->info = info;
1314                list_add_tail(&response->list, &info->receive_queue);
1315                info->count_receive_queue++;
1316        }
1317
1318        return 0;
1319
1320allocate_failed:
1321        while (!list_empty(&info->receive_queue)) {
1322                response = list_first_entry(
1323                                &info->receive_queue,
1324                                struct smbd_response, list);
1325                list_del(&response->list);
1326                info->count_receive_queue--;
1327
1328                mempool_free(response, info->response_mempool);
1329        }
1330        return -ENOMEM;
1331}
1332
1333static void destroy_receive_buffers(struct smbd_connection *info)
1334{
1335        struct smbd_response *response;
1336
1337        while ((response = get_receive_buffer(info)))
1338                mempool_free(response, info->response_mempool);
1339
1340        while ((response = get_empty_queue_buffer(info)))
1341                mempool_free(response, info->response_mempool);
1342}
1343
1344/*
1345 * Check and send an immediate or keep alive packet
1346 * The condition to send those packets are defined in [MS-SMBD] 3.1.1.1
1347 * Connection.KeepaliveRequested and Connection.SendImmediate
1348 * The idea is to extend credits to server as soon as it becomes available
1349 */
1350static void send_immediate_work(struct work_struct *work)
1351{
1352        struct smbd_connection *info = container_of(
1353                                        work, struct smbd_connection,
1354                                        send_immediate_work.work);
1355
1356        if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
1357            info->send_immediate) {
1358                log_keep_alive(INFO, "send an empty message\n");
1359                smbd_post_send_empty(info);
1360        }
1361}
1362
1363/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1364static void idle_connection_timer(struct work_struct *work)
1365{
1366        struct smbd_connection *info = container_of(
1367                                        work, struct smbd_connection,
1368                                        idle_timer_work.work);
1369
1370        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1371                log_keep_alive(ERR,
1372                        "error status info->keep_alive_requested=%d\n",
1373                        info->keep_alive_requested);
1374                smbd_disconnect_rdma_connection(info);
1375                return;
1376        }
1377
1378        log_keep_alive(INFO, "about to send an empty idle message\n");
1379        smbd_post_send_empty(info);
1380
1381        /* Setup the next idle timeout work */
1382        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1383                        info->keep_alive_interval*HZ);
1384}
1385
1386/*
1387 * Destroy the transport and related RDMA and memory resources
1388 * Need to go through all the pending counters and make sure on one is using
1389 * the transport while it is destroyed
1390 */
1391void smbd_destroy(struct TCP_Server_Info *server)
1392{
1393        struct smbd_connection *info = server->smbd_conn;
1394        struct smbd_response *response;
1395        unsigned long flags;
1396
1397        if (!info) {
1398                log_rdma_event(INFO, "rdma session already destroyed\n");
1399                return;
1400        }
1401
1402        log_rdma_event(INFO, "destroying rdma session\n");
1403        if (info->transport_status != SMBD_DISCONNECTED) {
1404                rdma_disconnect(server->smbd_conn->id);
1405                log_rdma_event(INFO, "wait for transport being disconnected\n");
1406                wait_event_interruptible(
1407                        info->disconn_wait,
1408                        info->transport_status == SMBD_DISCONNECTED);
1409        }
1410
1411        log_rdma_event(INFO, "destroying qp\n");
1412        ib_drain_qp(info->id->qp);
1413        rdma_destroy_qp(info->id);
1414
1415        log_rdma_event(INFO, "cancelling idle timer\n");
1416        cancel_delayed_work_sync(&info->idle_timer_work);
1417        log_rdma_event(INFO, "cancelling send immediate work\n");
1418        cancel_delayed_work_sync(&info->send_immediate_work);
1419
1420        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1421        wait_event(info->wait_send_pending,
1422                atomic_read(&info->send_pending) == 0);
1423        wait_event(info->wait_send_payload_pending,
1424                atomic_read(&info->send_payload_pending) == 0);
1425
1426        /* It's not posssible for upper layer to get to reassembly */
1427        log_rdma_event(INFO, "drain the reassembly queue\n");
1428        do {
1429                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1430                response = _get_first_reassembly(info);
1431                if (response) {
1432                        list_del(&response->list);
1433                        spin_unlock_irqrestore(
1434                                &info->reassembly_queue_lock, flags);
1435                        put_receive_buffer(info, response);
1436                } else
1437                        spin_unlock_irqrestore(
1438                                &info->reassembly_queue_lock, flags);
1439        } while (response);
1440        info->reassembly_data_length = 0;
1441
1442        log_rdma_event(INFO, "free receive buffers\n");
1443        wait_event(info->wait_receive_queues,
1444                info->count_receive_queue + info->count_empty_packet_queue
1445                        == info->receive_credit_max);
1446        destroy_receive_buffers(info);
1447
1448        /*
1449         * For performance reasons, memory registration and deregistration
1450         * are not locked by srv_mutex. It is possible some processes are
1451         * blocked on transport srv_mutex while holding memory registration.
1452         * Release the transport srv_mutex to allow them to hit the failure
1453         * path when sending data, and then release memory registartions.
1454         */
1455        log_rdma_event(INFO, "freeing mr list\n");
1456        wake_up_interruptible_all(&info->wait_mr);
1457        while (atomic_read(&info->mr_used_count)) {
1458                mutex_unlock(&server->srv_mutex);
1459                msleep(1000);
1460                mutex_lock(&server->srv_mutex);
1461        }
1462        destroy_mr_list(info);
1463
1464        ib_free_cq(info->send_cq);
1465        ib_free_cq(info->recv_cq);
1466        ib_dealloc_pd(info->pd);
1467        rdma_destroy_id(info->id);
1468
1469        /* free mempools */
1470        mempool_destroy(info->request_mempool);
1471        kmem_cache_destroy(info->request_cache);
1472
1473        mempool_destroy(info->response_mempool);
1474        kmem_cache_destroy(info->response_cache);
1475
1476        info->transport_status = SMBD_DESTROYED;
1477
1478        destroy_workqueue(info->workqueue);
1479        kfree(info);
1480}
1481
1482/*
1483 * Reconnect this SMBD connection, called from upper layer
1484 * return value: 0 on success, or actual error code
1485 */
1486int smbd_reconnect(struct TCP_Server_Info *server)
1487{
1488        log_rdma_event(INFO, "reconnecting rdma session\n");
1489
1490        if (!server->smbd_conn) {
1491                log_rdma_event(INFO, "rdma session already destroyed\n");
1492                goto create_conn;
1493        }
1494
1495        /*
1496         * This is possible if transport is disconnected and we haven't received
1497         * notification from RDMA, but upper layer has detected timeout
1498         */
1499        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1500                log_rdma_event(INFO, "disconnecting transport\n");
1501                smbd_destroy(server);
1502        }
1503
1504create_conn:
1505        log_rdma_event(INFO, "creating rdma session\n");
1506        server->smbd_conn = smbd_get_connection(
1507                server, (struct sockaddr *) &server->dstaddr);
1508        log_rdma_event(INFO, "created rdma session info=%p\n",
1509                server->smbd_conn);
1510
1511        return server->smbd_conn ? 0 : -ENOENT;
1512}
1513
1514static void destroy_caches_and_workqueue(struct smbd_connection *info)
1515{
1516        destroy_receive_buffers(info);
1517        destroy_workqueue(info->workqueue);
1518        mempool_destroy(info->response_mempool);
1519        kmem_cache_destroy(info->response_cache);
1520        mempool_destroy(info->request_mempool);
1521        kmem_cache_destroy(info->request_cache);
1522}
1523
1524#define MAX_NAME_LEN    80
1525static int allocate_caches_and_workqueue(struct smbd_connection *info)
1526{
1527        char name[MAX_NAME_LEN];
1528        int rc;
1529
1530        scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1531        info->request_cache =
1532                kmem_cache_create(
1533                        name,
1534                        sizeof(struct smbd_request) +
1535                                sizeof(struct smbd_data_transfer),
1536                        0, SLAB_HWCACHE_ALIGN, NULL);
1537        if (!info->request_cache)
1538                return -ENOMEM;
1539
1540        info->request_mempool =
1541                mempool_create(info->send_credit_target, mempool_alloc_slab,
1542                        mempool_free_slab, info->request_cache);
1543        if (!info->request_mempool)
1544                goto out1;
1545
1546        scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1547        info->response_cache =
1548                kmem_cache_create(
1549                        name,
1550                        sizeof(struct smbd_response) +
1551                                info->max_receive_size,
1552                        0, SLAB_HWCACHE_ALIGN, NULL);
1553        if (!info->response_cache)
1554                goto out2;
1555
1556        info->response_mempool =
1557                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1558                       mempool_free_slab, info->response_cache);
1559        if (!info->response_mempool)
1560                goto out3;
1561
1562        scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1563        info->workqueue = create_workqueue(name);
1564        if (!info->workqueue)
1565                goto out4;
1566
1567        rc = allocate_receive_buffers(info, info->receive_credit_max);
1568        if (rc) {
1569                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1570                goto out5;
1571        }
1572
1573        return 0;
1574
1575out5:
1576        destroy_workqueue(info->workqueue);
1577out4:
1578        mempool_destroy(info->response_mempool);
1579out3:
1580        kmem_cache_destroy(info->response_cache);
1581out2:
1582        mempool_destroy(info->request_mempool);
1583out1:
1584        kmem_cache_destroy(info->request_cache);
1585        return -ENOMEM;
1586}
1587
1588/* Create a SMBD connection, called by upper layer */
1589static struct smbd_connection *_smbd_get_connection(
1590        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1591{
1592        int rc;
1593        struct smbd_connection *info;
1594        struct rdma_conn_param conn_param;
1595        struct ib_qp_init_attr qp_attr;
1596        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1597        struct ib_port_immutable port_immutable;
1598        u32 ird_ord_hdr[2];
1599
1600        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1601        if (!info)
1602                return NULL;
1603
1604        info->transport_status = SMBD_CONNECTING;
1605        rc = smbd_ia_open(info, dstaddr, port);
1606        if (rc) {
1607                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1608                goto create_id_failed;
1609        }
1610
1611        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1612            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1613                log_rdma_event(ERR,
1614                        "consider lowering send_credit_target = %d. "
1615                        "Possible CQE overrun, device "
1616                        "reporting max_cpe %d max_qp_wr %d\n",
1617                        smbd_send_credit_target,
1618                        info->id->device->attrs.max_cqe,
1619                        info->id->device->attrs.max_qp_wr);
1620                goto config_failed;
1621        }
1622
1623        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1624            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1625                log_rdma_event(ERR,
1626                        "consider lowering receive_credit_max = %d. "
1627                        "Possible CQE overrun, device "
1628                        "reporting max_cpe %d max_qp_wr %d\n",
1629                        smbd_receive_credit_max,
1630                        info->id->device->attrs.max_cqe,
1631                        info->id->device->attrs.max_qp_wr);
1632                goto config_failed;
1633        }
1634
1635        info->receive_credit_max = smbd_receive_credit_max;
1636        info->send_credit_target = smbd_send_credit_target;
1637        info->max_send_size = smbd_max_send_size;
1638        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1639        info->max_receive_size = smbd_max_receive_size;
1640        info->keep_alive_interval = smbd_keep_alive_interval;
1641
1642        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1643                log_rdma_event(ERR,
1644                        "warning: device max_send_sge = %d too small\n",
1645                        info->id->device->attrs.max_send_sge);
1646                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1647        }
1648        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1649                log_rdma_event(ERR,
1650                        "warning: device max_recv_sge = %d too small\n",
1651                        info->id->device->attrs.max_recv_sge);
1652                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1653        }
1654
1655        info->send_cq = NULL;
1656        info->recv_cq = NULL;
1657        info->send_cq =
1658                ib_alloc_cq_any(info->id->device, info,
1659                                info->send_credit_target, IB_POLL_SOFTIRQ);
1660        if (IS_ERR(info->send_cq)) {
1661                info->send_cq = NULL;
1662                goto alloc_cq_failed;
1663        }
1664
1665        info->recv_cq =
1666                ib_alloc_cq_any(info->id->device, info,
1667                                info->receive_credit_max, IB_POLL_SOFTIRQ);
1668        if (IS_ERR(info->recv_cq)) {
1669                info->recv_cq = NULL;
1670                goto alloc_cq_failed;
1671        }
1672
1673        memset(&qp_attr, 0, sizeof(qp_attr));
1674        qp_attr.event_handler = smbd_qp_async_error_upcall;
1675        qp_attr.qp_context = info;
1676        qp_attr.cap.max_send_wr = info->send_credit_target;
1677        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1678        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1679        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1680        qp_attr.cap.max_inline_data = 0;
1681        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1682        qp_attr.qp_type = IB_QPT_RC;
1683        qp_attr.send_cq = info->send_cq;
1684        qp_attr.recv_cq = info->recv_cq;
1685        qp_attr.port_num = ~0;
1686
1687        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1688        if (rc) {
1689                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1690                goto create_qp_failed;
1691        }
1692
1693        memset(&conn_param, 0, sizeof(conn_param));
1694        conn_param.initiator_depth = 0;
1695
1696        conn_param.responder_resources =
1697                info->id->device->attrs.max_qp_rd_atom
1698                        < SMBD_CM_RESPONDER_RESOURCES ?
1699                info->id->device->attrs.max_qp_rd_atom :
1700                SMBD_CM_RESPONDER_RESOURCES;
1701        info->responder_resources = conn_param.responder_resources;
1702        log_rdma_mr(INFO, "responder_resources=%d\n",
1703                info->responder_resources);
1704
1705        /* Need to send IRD/ORD in private data for iWARP */
1706        info->id->device->ops.get_port_immutable(
1707                info->id->device, info->id->port_num, &port_immutable);
1708        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1709                ird_ord_hdr[0] = info->responder_resources;
1710                ird_ord_hdr[1] = 1;
1711                conn_param.private_data = ird_ord_hdr;
1712                conn_param.private_data_len = sizeof(ird_ord_hdr);
1713        } else {
1714                conn_param.private_data = NULL;
1715                conn_param.private_data_len = 0;
1716        }
1717
1718        conn_param.retry_count = SMBD_CM_RETRY;
1719        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1720        conn_param.flow_control = 0;
1721
1722        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1723                &addr_in->sin_addr, port);
1724
1725        init_waitqueue_head(&info->conn_wait);
1726        init_waitqueue_head(&info->disconn_wait);
1727        init_waitqueue_head(&info->wait_reassembly_queue);
1728        rc = rdma_connect(info->id, &conn_param);
1729        if (rc) {
1730                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1731                goto rdma_connect_failed;
1732        }
1733
1734        wait_event_interruptible(
1735                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1736
1737        if (info->transport_status != SMBD_CONNECTED) {
1738                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1739                goto rdma_connect_failed;
1740        }
1741
1742        log_rdma_event(INFO, "rdma_connect connected\n");
1743
1744        rc = allocate_caches_and_workqueue(info);
1745        if (rc) {
1746                log_rdma_event(ERR, "cache allocation failed\n");
1747                goto allocate_cache_failed;
1748        }
1749
1750        init_waitqueue_head(&info->wait_send_queue);
1751        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1752        INIT_DELAYED_WORK(&info->send_immediate_work, send_immediate_work);
1753        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1754                info->keep_alive_interval*HZ);
1755
1756        init_waitqueue_head(&info->wait_send_pending);
1757        atomic_set(&info->send_pending, 0);
1758
1759        init_waitqueue_head(&info->wait_send_payload_pending);
1760        atomic_set(&info->send_payload_pending, 0);
1761
1762        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1763        INIT_WORK(&info->recv_done_work, smbd_recv_done_work);
1764        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1765        info->new_credits_offered = 0;
1766        spin_lock_init(&info->lock_new_credits_offered);
1767
1768        rc = smbd_negotiate(info);
1769        if (rc) {
1770                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1771                goto negotiation_failed;
1772        }
1773
1774        rc = allocate_mr_list(info);
1775        if (rc) {
1776                log_rdma_mr(ERR, "memory registration allocation failed\n");
1777                goto allocate_mr_failed;
1778        }
1779
1780        return info;
1781
1782allocate_mr_failed:
1783        /* At this point, need to a full transport shutdown */
1784        smbd_destroy(server);
1785        return NULL;
1786
1787negotiation_failed:
1788        cancel_delayed_work_sync(&info->idle_timer_work);
1789        destroy_caches_and_workqueue(info);
1790        info->transport_status = SMBD_NEGOTIATE_FAILED;
1791        init_waitqueue_head(&info->conn_wait);
1792        rdma_disconnect(info->id);
1793        wait_event(info->conn_wait,
1794                info->transport_status == SMBD_DISCONNECTED);
1795
1796allocate_cache_failed:
1797rdma_connect_failed:
1798        rdma_destroy_qp(info->id);
1799
1800create_qp_failed:
1801alloc_cq_failed:
1802        if (info->send_cq)
1803                ib_free_cq(info->send_cq);
1804        if (info->recv_cq)
1805                ib_free_cq(info->recv_cq);
1806
1807config_failed:
1808        ib_dealloc_pd(info->pd);
1809        rdma_destroy_id(info->id);
1810
1811create_id_failed:
1812        kfree(info);
1813        return NULL;
1814}
1815
1816struct smbd_connection *smbd_get_connection(
1817        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1818{
1819        struct smbd_connection *ret;
1820        int port = SMBD_PORT;
1821
1822try_again:
1823        ret = _smbd_get_connection(server, dstaddr, port);
1824
1825        /* Try SMB_PORT if SMBD_PORT doesn't work */
1826        if (!ret && port == SMBD_PORT) {
1827                port = SMB_PORT;
1828                goto try_again;
1829        }
1830        return ret;
1831}
1832
1833/*
1834 * Receive data from receive reassembly queue
1835 * All the incoming data packets are placed in reassembly queue
1836 * buf: the buffer to read data into
1837 * size: the length of data to read
1838 * return value: actual data read
1839 * Note: this implementation copies the data from reassebmly queue to receive
1840 * buffers used by upper layer. This is not the optimal code path. A better way
1841 * to do it is to not have upper layer allocate its receive buffers but rather
1842 * borrow the buffer from reassembly queue, and return it after data is
1843 * consumed. But this will require more changes to upper layer code, and also
1844 * need to consider packet boundaries while they still being reassembled.
1845 */
1846static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1847                unsigned int size)
1848{
1849        struct smbd_response *response;
1850        struct smbd_data_transfer *data_transfer;
1851        int to_copy, to_read, data_read, offset;
1852        u32 data_length, remaining_data_length, data_offset;
1853        int rc;
1854
1855again:
1856        /*
1857         * No need to hold the reassembly queue lock all the time as we are
1858         * the only one reading from the front of the queue. The transport
1859         * may add more entries to the back of the queue at the same time
1860         */
1861        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1862                info->reassembly_data_length);
1863        if (info->reassembly_data_length >= size) {
1864                int queue_length;
1865                int queue_removed = 0;
1866
1867                /*
1868                 * Need to make sure reassembly_data_length is read before
1869                 * reading reassembly_queue_length and calling
1870                 * _get_first_reassembly. This call is lock free
1871                 * as we never read at the end of the queue which are being
1872                 * updated in SOFTIRQ as more data is received
1873                 */
1874                virt_rmb();
1875                queue_length = info->reassembly_queue_length;
1876                data_read = 0;
1877                to_read = size;
1878                offset = info->first_entry_offset;
1879                while (data_read < size) {
1880                        response = _get_first_reassembly(info);
1881                        data_transfer = smbd_response_payload(response);
1882                        data_length = le32_to_cpu(data_transfer->data_length);
1883                        remaining_data_length =
1884                                le32_to_cpu(
1885                                        data_transfer->remaining_data_length);
1886                        data_offset = le32_to_cpu(data_transfer->data_offset);
1887
1888                        /*
1889                         * The upper layer expects RFC1002 length at the
1890                         * beginning of the payload. Return it to indicate
1891                         * the total length of the packet. This minimize the
1892                         * change to upper layer packet processing logic. This
1893                         * will be eventually remove when an intermediate
1894                         * transport layer is added
1895                         */
1896                        if (response->first_segment && size == 4) {
1897                                unsigned int rfc1002_len =
1898                                        data_length + remaining_data_length;
1899                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1900                                data_read = 4;
1901                                response->first_segment = false;
1902                                log_read(INFO, "returning rfc1002 length %d\n",
1903                                        rfc1002_len);
1904                                goto read_rfc1002_done;
1905                        }
1906
1907                        to_copy = min_t(int, data_length - offset, to_read);
1908                        memcpy(
1909                                buf + data_read,
1910                                (char *)data_transfer + data_offset + offset,
1911                                to_copy);
1912
1913                        /* move on to the next buffer? */
1914                        if (to_copy == data_length - offset) {
1915                                queue_length--;
1916                                /*
1917                                 * No need to lock if we are not at the
1918                                 * end of the queue
1919                                 */
1920                                if (queue_length)
1921                                        list_del(&response->list);
1922                                else {
1923                                        spin_lock_irq(
1924                                                &info->reassembly_queue_lock);
1925                                        list_del(&response->list);
1926                                        spin_unlock_irq(
1927                                                &info->reassembly_queue_lock);
1928                                }
1929                                queue_removed++;
1930                                info->count_reassembly_queue--;
1931                                info->count_dequeue_reassembly_queue++;
1932                                put_receive_buffer(info, response);
1933                                offset = 0;
1934                                log_read(INFO, "put_receive_buffer offset=0\n");
1935                        } else
1936                                offset += to_copy;
1937
1938                        to_read -= to_copy;
1939                        data_read += to_copy;
1940
1941                        log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1942                                "data_transfer_length-offset=%d after that "
1943                                "to_read=%d data_read=%d offset=%d\n",
1944                                to_copy, data_length - offset,
1945                                to_read, data_read, offset);
1946                }
1947
1948                spin_lock_irq(&info->reassembly_queue_lock);
1949                info->reassembly_data_length -= data_read;
1950                info->reassembly_queue_length -= queue_removed;
1951                spin_unlock_irq(&info->reassembly_queue_lock);
1952
1953                info->first_entry_offset = offset;
1954                log_read(INFO, "returning to thread data_read=%d "
1955                        "reassembly_data_length=%d first_entry_offset=%d\n",
1956                        data_read, info->reassembly_data_length,
1957                        info->first_entry_offset);
1958read_rfc1002_done:
1959                return data_read;
1960        }
1961
1962        log_read(INFO, "wait_event on more data\n");
1963        rc = wait_event_interruptible(
1964                info->wait_reassembly_queue,
1965                info->reassembly_data_length >= size ||
1966                        info->transport_status != SMBD_CONNECTED);
1967        /* Don't return any data if interrupted */
1968        if (rc)
1969                return rc;
1970
1971        if (info->transport_status != SMBD_CONNECTED) {
1972                log_read(ERR, "disconnected\n");
1973                return 0;
1974        }
1975
1976        goto again;
1977}
1978
1979/*
1980 * Receive a page from receive reassembly queue
1981 * page: the page to read data into
1982 * to_read: the length of data to read
1983 * return value: actual data read
1984 */
1985static int smbd_recv_page(struct smbd_connection *info,
1986                struct page *page, unsigned int page_offset,
1987                unsigned int to_read)
1988{
1989        int ret;
1990        char *to_address;
1991        void *page_address;
1992
1993        /* make sure we have the page ready for read */
1994        ret = wait_event_interruptible(
1995                info->wait_reassembly_queue,
1996                info->reassembly_data_length >= to_read ||
1997                        info->transport_status != SMBD_CONNECTED);
1998        if (ret)
1999                return ret;
2000
2001        /* now we can read from reassembly queue and not sleep */
2002        page_address = kmap_atomic(page);
2003        to_address = (char *) page_address + page_offset;
2004
2005        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
2006                page, to_address, to_read);
2007
2008        ret = smbd_recv_buf(info, to_address, to_read);
2009        kunmap_atomic(page_address);
2010
2011        return ret;
2012}
2013
2014/*
2015 * Receive data from transport
2016 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
2017 * return: total bytes read, or 0. SMB Direct will not do partial read.
2018 */
2019int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
2020{
2021        char *buf;
2022        struct page *page;
2023        unsigned int to_read, page_offset;
2024        int rc;
2025
2026        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
2027                /* It's a bug in upper layer to get there */
2028                cifs_dbg(VFS, "CIFS: invalid msg iter dir %u\n",
2029                         iov_iter_rw(&msg->msg_iter));
2030                rc = -EINVAL;
2031                goto out;
2032        }
2033
2034        switch (iov_iter_type(&msg->msg_iter)) {
2035        case ITER_KVEC:
2036                buf = msg->msg_iter.kvec->iov_base;
2037                to_read = msg->msg_iter.kvec->iov_len;
2038                rc = smbd_recv_buf(info, buf, to_read);
2039                break;
2040
2041        case ITER_BVEC:
2042                page = msg->msg_iter.bvec->bv_page;
2043                page_offset = msg->msg_iter.bvec->bv_offset;
2044                to_read = msg->msg_iter.bvec->bv_len;
2045                rc = smbd_recv_page(info, page, page_offset, to_read);
2046                break;
2047
2048        default:
2049                /* It's a bug in upper layer to get there */
2050                cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
2051                         iov_iter_type(&msg->msg_iter));
2052                rc = -EINVAL;
2053        }
2054
2055out:
2056        /* SMBDirect will read it all or nothing */
2057        if (rc > 0)
2058                msg->msg_iter.count = 0;
2059        return rc;
2060}
2061
2062/*
2063 * Send data to transport
2064 * Each rqst is transported as a SMBDirect payload
2065 * rqst: the data to write
2066 * return value: 0 if successfully write, otherwise error code
2067 */
2068int smbd_send(struct TCP_Server_Info *server,
2069        int num_rqst, struct smb_rqst *rqst_array)
2070{
2071        struct smbd_connection *info = server->smbd_conn;
2072        struct kvec vec;
2073        int nvecs;
2074        int size;
2075        unsigned int buflen, remaining_data_length;
2076        int start, i, j;
2077        int max_iov_size =
2078                info->max_send_size - sizeof(struct smbd_data_transfer);
2079        struct kvec *iov;
2080        int rc;
2081        struct smb_rqst *rqst;
2082        int rqst_idx;
2083
2084        if (info->transport_status != SMBD_CONNECTED) {
2085                rc = -EAGAIN;
2086                goto done;
2087        }
2088
2089        /*
2090         * Add in the page array if there is one. The caller needs to set
2091         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
2092         * ends at page boundary
2093         */
2094        remaining_data_length = 0;
2095        for (i = 0; i < num_rqst; i++)
2096                remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2097
2098        if (remaining_data_length + sizeof(struct smbd_data_transfer) >
2099                info->max_fragmented_send_size) {
2100                log_write(ERR, "payload size %d > max size %d\n",
2101                        remaining_data_length, info->max_fragmented_send_size);
2102                rc = -EINVAL;
2103                goto done;
2104        }
2105
2106        log_write(INFO, "num_rqst=%d total length=%u\n",
2107                        num_rqst, remaining_data_length);
2108
2109        rqst_idx = 0;
2110next_rqst:
2111        rqst = &rqst_array[rqst_idx];
2112        iov = rqst->rq_iov;
2113
2114        cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2115                rqst_idx, smb_rqst_len(server, rqst));
2116        for (i = 0; i < rqst->rq_nvec; i++)
2117                dump_smb(iov[i].iov_base, iov[i].iov_len);
2118
2119
2120        log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
2121                "rq_tailsz=%d buflen=%lu\n",
2122                rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2123                rqst->rq_tailsz, smb_rqst_len(server, rqst));
2124
2125        start = i = 0;
2126        buflen = 0;
2127        while (true) {
2128                buflen += iov[i].iov_len;
2129                if (buflen > max_iov_size) {
2130                        if (i > start) {
2131                                remaining_data_length -=
2132                                        (buflen-iov[i].iov_len);
2133                                log_write(INFO, "sending iov[] from start=%d "
2134                                        "i=%d nvecs=%d "
2135                                        "remaining_data_length=%d\n",
2136                                        start, i, i-start,
2137                                        remaining_data_length);
2138                                rc = smbd_post_send_data(
2139                                        info, &iov[start], i-start,
2140                                        remaining_data_length);
2141                                if (rc)
2142                                        goto done;
2143                        } else {
2144                                /* iov[start] is too big, break it */
2145                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2146                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
2147                                        " break to %d vectors\n",
2148                                        start, iov[start].iov_base,
2149                                        buflen, nvecs);
2150                                for (j = 0; j < nvecs; j++) {
2151                                        vec.iov_base =
2152                                                (char *)iov[start].iov_base +
2153                                                j*max_iov_size;
2154                                        vec.iov_len = max_iov_size;
2155                                        if (j == nvecs-1)
2156                                                vec.iov_len =
2157                                                        buflen -
2158                                                        max_iov_size*(nvecs-1);
2159                                        remaining_data_length -= vec.iov_len;
2160                                        log_write(INFO,
2161                                                "sending vec j=%d iov_base=%p"
2162                                                " iov_len=%zu "
2163                                                "remaining_data_length=%d\n",
2164                                                j, vec.iov_base, vec.iov_len,
2165                                                remaining_data_length);
2166                                        rc = smbd_post_send_data(
2167                                                info, &vec, 1,
2168                                                remaining_data_length);
2169                                        if (rc)
2170                                                goto done;
2171                                }
2172                                i++;
2173                                if (i == rqst->rq_nvec)
2174                                        break;
2175                        }
2176                        start = i;
2177                        buflen = 0;
2178                } else {
2179                        i++;
2180                        if (i == rqst->rq_nvec) {
2181                                /* send out all remaining vecs */
2182                                remaining_data_length -= buflen;
2183                                log_write(INFO,
2184                                        "sending iov[] from start=%d i=%d "
2185                                        "nvecs=%d remaining_data_length=%d\n",
2186                                        start, i, i-start,
2187                                        remaining_data_length);
2188                                rc = smbd_post_send_data(info, &iov[start],
2189                                        i-start, remaining_data_length);
2190                                if (rc)
2191                                        goto done;
2192                                break;
2193                        }
2194                }
2195                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2196        }
2197
2198        /* now sending pages if there are any */
2199        for (i = 0; i < rqst->rq_npages; i++) {
2200                unsigned int offset;
2201
2202                rqst_page_get_length(rqst, i, &buflen, &offset);
2203                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2204                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2205                        buflen, nvecs);
2206                for (j = 0; j < nvecs; j++) {
2207                        size = max_iov_size;
2208                        if (j == nvecs-1)
2209                                size = buflen - j*max_iov_size;
2210                        remaining_data_length -= size;
2211                        log_write(INFO, "sending pages i=%d offset=%d size=%d"
2212                                " remaining_data_length=%d\n",
2213                                i, j*max_iov_size+offset, size,
2214                                remaining_data_length);
2215                        rc = smbd_post_send_page(
2216                                info, rqst->rq_pages[i],
2217                                j*max_iov_size + offset,
2218                                size, remaining_data_length);
2219                        if (rc)
2220                                goto done;
2221                }
2222        }
2223
2224        rqst_idx++;
2225        if (rqst_idx < num_rqst)
2226                goto next_rqst;
2227
2228done:
2229        /*
2230         * As an optimization, we don't wait for individual I/O to finish
2231         * before sending the next one.
2232         * Send them all and wait for pending send count to get to 0
2233         * that means all the I/Os have been out and we are good to return
2234         */
2235
2236        wait_event(info->wait_send_payload_pending,
2237                atomic_read(&info->send_payload_pending) == 0);
2238
2239        return rc;
2240}
2241
2242static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2243{
2244        struct smbd_mr *mr;
2245        struct ib_cqe *cqe;
2246
2247        if (wc->status) {
2248                log_rdma_mr(ERR, "status=%d\n", wc->status);
2249                cqe = wc->wr_cqe;
2250                mr = container_of(cqe, struct smbd_mr, cqe);
2251                smbd_disconnect_rdma_connection(mr->conn);
2252        }
2253}
2254
2255/*
2256 * The work queue function that recovers MRs
2257 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2258 * again. Both calls are slow, so finish them in a workqueue. This will not
2259 * block I/O path.
2260 * There is one workqueue that recovers MRs, there is no need to lock as the
2261 * I/O requests calling smbd_register_mr will never update the links in the
2262 * mr_list.
2263 */
2264static void smbd_mr_recovery_work(struct work_struct *work)
2265{
2266        struct smbd_connection *info =
2267                container_of(work, struct smbd_connection, mr_recovery_work);
2268        struct smbd_mr *smbdirect_mr;
2269        int rc;
2270
2271        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2272                if (smbdirect_mr->state == MR_INVALIDATED)
2273                        ib_dma_unmap_sg(
2274                                info->id->device, smbdirect_mr->sgl,
2275                                smbdirect_mr->sgl_count,
2276                                smbdirect_mr->dir);
2277                else if (smbdirect_mr->state == MR_ERROR) {
2278
2279                        /* recover this MR entry */
2280                        rc = ib_dereg_mr(smbdirect_mr->mr);
2281                        if (rc) {
2282                                log_rdma_mr(ERR,
2283                                        "ib_dereg_mr failed rc=%x\n",
2284                                        rc);
2285                                smbd_disconnect_rdma_connection(info);
2286                                continue;
2287                        }
2288
2289                        smbdirect_mr->mr = ib_alloc_mr(
2290                                info->pd, info->mr_type,
2291                                info->max_frmr_depth);
2292                        if (IS_ERR(smbdirect_mr->mr)) {
2293                                log_rdma_mr(ERR,
2294                                        "ib_alloc_mr failed mr_type=%x "
2295                                        "max_frmr_depth=%x\n",
2296                                        info->mr_type,
2297                                        info->max_frmr_depth);
2298                                smbd_disconnect_rdma_connection(info);
2299                                continue;
2300                        }
2301                } else
2302                        /* This MR is being used, don't recover it */
2303                        continue;
2304
2305                smbdirect_mr->state = MR_READY;
2306
2307                /* smbdirect_mr->state is updated by this function
2308                 * and is read and updated by I/O issuing CPUs trying
2309                 * to get a MR, the call to atomic_inc_return
2310                 * implicates a memory barrier and guarantees this
2311                 * value is updated before waking up any calls to
2312                 * get_mr() from the I/O issuing CPUs
2313                 */
2314                if (atomic_inc_return(&info->mr_ready_count) == 1)
2315                        wake_up_interruptible(&info->wait_mr);
2316        }
2317}
2318
2319static void destroy_mr_list(struct smbd_connection *info)
2320{
2321        struct smbd_mr *mr, *tmp;
2322
2323        cancel_work_sync(&info->mr_recovery_work);
2324        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2325                if (mr->state == MR_INVALIDATED)
2326                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2327                                mr->sgl_count, mr->dir);
2328                ib_dereg_mr(mr->mr);
2329                kfree(mr->sgl);
2330                kfree(mr);
2331        }
2332}
2333
2334/*
2335 * Allocate MRs used for RDMA read/write
2336 * The number of MRs will not exceed hardware capability in responder_resources
2337 * All MRs are kept in mr_list. The MR can be recovered after it's used
2338 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2339 * as MRs are used and recovered for I/O, but the list links will not change
2340 */
2341static int allocate_mr_list(struct smbd_connection *info)
2342{
2343        int i;
2344        struct smbd_mr *smbdirect_mr, *tmp;
2345
2346        INIT_LIST_HEAD(&info->mr_list);
2347        init_waitqueue_head(&info->wait_mr);
2348        spin_lock_init(&info->mr_list_lock);
2349        atomic_set(&info->mr_ready_count, 0);
2350        atomic_set(&info->mr_used_count, 0);
2351        init_waitqueue_head(&info->wait_for_mr_cleanup);
2352        /* Allocate more MRs (2x) than hardware responder_resources */
2353        for (i = 0; i < info->responder_resources * 2; i++) {
2354                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2355                if (!smbdirect_mr)
2356                        goto out;
2357                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2358                                        info->max_frmr_depth);
2359                if (IS_ERR(smbdirect_mr->mr)) {
2360                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2361                                "max_frmr_depth=%x\n",
2362                                info->mr_type, info->max_frmr_depth);
2363                        goto out;
2364                }
2365                smbdirect_mr->sgl = kcalloc(
2366                                        info->max_frmr_depth,
2367                                        sizeof(struct scatterlist),
2368                                        GFP_KERNEL);
2369                if (!smbdirect_mr->sgl) {
2370                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2371                        ib_dereg_mr(smbdirect_mr->mr);
2372                        goto out;
2373                }
2374                smbdirect_mr->state = MR_READY;
2375                smbdirect_mr->conn = info;
2376
2377                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2378                atomic_inc(&info->mr_ready_count);
2379        }
2380        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2381        return 0;
2382
2383out:
2384        kfree(smbdirect_mr);
2385
2386        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2387                ib_dereg_mr(smbdirect_mr->mr);
2388                kfree(smbdirect_mr->sgl);
2389                kfree(smbdirect_mr);
2390        }
2391        return -ENOMEM;
2392}
2393
2394/*
2395 * Get a MR from mr_list. This function waits until there is at least one
2396 * MR available in the list. It may access the list while the
2397 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2398 * as they never modify the same places. However, there may be several CPUs
2399 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2400 * protect this situation.
2401 */
2402static struct smbd_mr *get_mr(struct smbd_connection *info)
2403{
2404        struct smbd_mr *ret;
2405        int rc;
2406again:
2407        rc = wait_event_interruptible(info->wait_mr,
2408                atomic_read(&info->mr_ready_count) ||
2409                info->transport_status != SMBD_CONNECTED);
2410        if (rc) {
2411                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2412                return NULL;
2413        }
2414
2415        if (info->transport_status != SMBD_CONNECTED) {
2416                log_rdma_mr(ERR, "info->transport_status=%x\n",
2417                        info->transport_status);
2418                return NULL;
2419        }
2420
2421        spin_lock(&info->mr_list_lock);
2422        list_for_each_entry(ret, &info->mr_list, list) {
2423                if (ret->state == MR_READY) {
2424                        ret->state = MR_REGISTERED;
2425                        spin_unlock(&info->mr_list_lock);
2426                        atomic_dec(&info->mr_ready_count);
2427                        atomic_inc(&info->mr_used_count);
2428                        return ret;
2429                }
2430        }
2431
2432        spin_unlock(&info->mr_list_lock);
2433        /*
2434         * It is possible that we could fail to get MR because other processes may
2435         * try to acquire a MR at the same time. If this is the case, retry it.
2436         */
2437        goto again;
2438}
2439
2440/*
2441 * Register memory for RDMA read/write
2442 * pages[]: the list of pages to register memory with
2443 * num_pages: the number of pages to register
2444 * tailsz: if non-zero, the bytes to register in the last page
2445 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2446 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2447 * return value: the MR registered, NULL if failed.
2448 */
2449struct smbd_mr *smbd_register_mr(
2450        struct smbd_connection *info, struct page *pages[], int num_pages,
2451        int offset, int tailsz, bool writing, bool need_invalidate)
2452{
2453        struct smbd_mr *smbdirect_mr;
2454        int rc, i;
2455        enum dma_data_direction dir;
2456        struct ib_reg_wr *reg_wr;
2457
2458        if (num_pages > info->max_frmr_depth) {
2459                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2460                        num_pages, info->max_frmr_depth);
2461                return NULL;
2462        }
2463
2464        smbdirect_mr = get_mr(info);
2465        if (!smbdirect_mr) {
2466                log_rdma_mr(ERR, "get_mr returning NULL\n");
2467                return NULL;
2468        }
2469        smbdirect_mr->need_invalidate = need_invalidate;
2470        smbdirect_mr->sgl_count = num_pages;
2471        sg_init_table(smbdirect_mr->sgl, num_pages);
2472
2473        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2474                        num_pages, offset, tailsz);
2475
2476        if (num_pages == 1) {
2477                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2478                goto skip_multiple_pages;
2479        }
2480
2481        /* We have at least two pages to register */
2482        sg_set_page(
2483                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2484        i = 1;
2485        while (i < num_pages - 1) {
2486                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2487                i++;
2488        }
2489        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2490                tailsz ? tailsz : PAGE_SIZE, 0);
2491
2492skip_multiple_pages:
2493        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2494        smbdirect_mr->dir = dir;
2495        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2496        if (!rc) {
2497                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2498                        num_pages, dir, rc);
2499                goto dma_map_error;
2500        }
2501
2502        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2503                NULL, PAGE_SIZE);
2504        if (rc != num_pages) {
2505                log_rdma_mr(ERR,
2506                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2507                        rc, num_pages);
2508                goto map_mr_error;
2509        }
2510
2511        ib_update_fast_reg_key(smbdirect_mr->mr,
2512                ib_inc_rkey(smbdirect_mr->mr->rkey));
2513        reg_wr = &smbdirect_mr->wr;
2514        reg_wr->wr.opcode = IB_WR_REG_MR;
2515        smbdirect_mr->cqe.done = register_mr_done;
2516        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2517        reg_wr->wr.num_sge = 0;
2518        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2519        reg_wr->mr = smbdirect_mr->mr;
2520        reg_wr->key = smbdirect_mr->mr->rkey;
2521        reg_wr->access = writing ?
2522                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2523                        IB_ACCESS_REMOTE_READ;
2524
2525        /*
2526         * There is no need for waiting for complemtion on ib_post_send
2527         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2528         * on the next ib_post_send when we actaully send I/O to remote peer
2529         */
2530        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2531        if (!rc)
2532                return smbdirect_mr;
2533
2534        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2535                rc, reg_wr->key);
2536
2537        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2538map_mr_error:
2539        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2540                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2541
2542dma_map_error:
2543        smbdirect_mr->state = MR_ERROR;
2544        if (atomic_dec_and_test(&info->mr_used_count))
2545                wake_up(&info->wait_for_mr_cleanup);
2546
2547        smbd_disconnect_rdma_connection(info);
2548
2549        return NULL;
2550}
2551
2552static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2553{
2554        struct smbd_mr *smbdirect_mr;
2555        struct ib_cqe *cqe;
2556
2557        cqe = wc->wr_cqe;
2558        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2559        smbdirect_mr->state = MR_INVALIDATED;
2560        if (wc->status != IB_WC_SUCCESS) {
2561                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2562                smbdirect_mr->state = MR_ERROR;
2563        }
2564        complete(&smbdirect_mr->invalidate_done);
2565}
2566
2567/*
2568 * Deregister a MR after I/O is done
2569 * This function may wait if remote invalidation is not used
2570 * and we have to locally invalidate the buffer to prevent data is being
2571 * modified by remote peer after upper layer consumes it
2572 */
2573int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2574{
2575        struct ib_send_wr *wr;
2576        struct smbd_connection *info = smbdirect_mr->conn;
2577        int rc = 0;
2578
2579        if (smbdirect_mr->need_invalidate) {
2580                /* Need to finish local invalidation before returning */
2581                wr = &smbdirect_mr->inv_wr;
2582                wr->opcode = IB_WR_LOCAL_INV;
2583                smbdirect_mr->cqe.done = local_inv_done;
2584                wr->wr_cqe = &smbdirect_mr->cqe;
2585                wr->num_sge = 0;
2586                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2587                wr->send_flags = IB_SEND_SIGNALED;
2588
2589                init_completion(&smbdirect_mr->invalidate_done);
2590                rc = ib_post_send(info->id->qp, wr, NULL);
2591                if (rc) {
2592                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2593                        smbd_disconnect_rdma_connection(info);
2594                        goto done;
2595                }
2596                wait_for_completion(&smbdirect_mr->invalidate_done);
2597                smbdirect_mr->need_invalidate = false;
2598        } else
2599                /*
2600                 * For remote invalidation, just set it to MR_INVALIDATED
2601                 * and defer to mr_recovery_work to recover the MR for next use
2602                 */
2603                smbdirect_mr->state = MR_INVALIDATED;
2604
2605        /*
2606         * Schedule the work to do MR recovery for future I/Os
2607         * MR recovery is slow and we don't want it to block the current I/O
2608         */
2609        queue_work(info->workqueue, &info->mr_recovery_work);
2610
2611done:
2612        if (atomic_dec_and_test(&info->mr_used_count))
2613                wake_up(&info->wait_for_mr_cleanup);
2614
2615        return rc;
2616}
2617