linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15                struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17                struct smbd_connection *info);
  18static void put_receive_buffer(
  19                struct smbd_connection *info,
  20                struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25                struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27                struct smbd_connection *info,
  28                struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30                struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33                struct smbd_connection *info,
  34                struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38                struct smbd_connection *info,
  39                struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41                struct page *page, unsigned long offset,
  42                size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1 0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT        445
  52#define SMBD_PORT       5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT    5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT  120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE           128
  62#define SMBD_MIN_FRAGMENTED_SIZE        131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES     32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY                   6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY               0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING                    0x1
 115#define LOG_INCOMING                    0x2
 116#define LOG_READ                        0x4
 117#define LOG_WRITE                       0x8
 118#define LOG_RDMA_SEND                   0x10
 119#define LOG_RDMA_RECV                   0x20
 120#define LOG_KEEP_ALIVE                  0x40
 121#define LOG_RDMA_EVENT                  0x80
 122#define LOG_RDMA_MR                     0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126        "Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR             0x0
 129#define INFO            0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133        "Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)                            \
 136do {                                                                    \
 137        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 138                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144                log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160        struct smbd_connection *info =
 161                container_of(work, struct smbd_connection, disconnect_work);
 162
 163        if (info->transport_status == SMBD_CONNECTED) {
 164                info->transport_status = SMBD_DISCONNECTING;
 165                rdma_disconnect(info->id);
 166        }
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171        queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176                struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178        struct smbd_connection *info = id->context;
 179
 180        log_rdma_event(INFO, "event=%d status=%d\n",
 181                event->event, event->status);
 182
 183        switch (event->event) {
 184        case RDMA_CM_EVENT_ADDR_RESOLVED:
 185        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186                info->ri_rc = 0;
 187                complete(&info->ri_done);
 188                break;
 189
 190        case RDMA_CM_EVENT_ADDR_ERROR:
 191                info->ri_rc = -EHOSTUNREACH;
 192                complete(&info->ri_done);
 193                break;
 194
 195        case RDMA_CM_EVENT_ROUTE_ERROR:
 196                info->ri_rc = -ENETUNREACH;
 197                complete(&info->ri_done);
 198                break;
 199
 200        case RDMA_CM_EVENT_ESTABLISHED:
 201                log_rdma_event(INFO, "connected event=%d\n", event->event);
 202                info->transport_status = SMBD_CONNECTED;
 203                wake_up_interruptible(&info->conn_wait);
 204                break;
 205
 206        case RDMA_CM_EVENT_CONNECT_ERROR:
 207        case RDMA_CM_EVENT_UNREACHABLE:
 208        case RDMA_CM_EVENT_REJECTED:
 209                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210                info->transport_status = SMBD_DISCONNECTED;
 211                wake_up_interruptible(&info->conn_wait);
 212                break;
 213
 214        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215        case RDMA_CM_EVENT_DISCONNECTED:
 216                /* This happenes when we fail the negotiation */
 217                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218                        info->transport_status = SMBD_DISCONNECTED;
 219                        wake_up(&info->conn_wait);
 220                        break;
 221                }
 222
 223                info->transport_status = SMBD_DISCONNECTED;
 224                wake_up_interruptible(&info->disconn_wait);
 225                wake_up_interruptible(&info->wait_reassembly_queue);
 226                wake_up_interruptible_all(&info->wait_send_queue);
 227                break;
 228
 229        default:
 230                break;
 231        }
 232
 233        return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240        struct smbd_connection *info = context;
 241
 242        log_rdma_event(ERR, "%s on device %s info %p\n",
 243                ib_event_msg(event->event), event->device->name, info);
 244
 245        switch (event->event) {
 246        case IB_EVENT_CQ_ERR:
 247        case IB_EVENT_QP_FATAL:
 248                smbd_disconnect_rdma_connection(info);
 249
 250        default:
 251                break;
 252        }
 253}
 254
 255static inline void *smbd_request_payload(struct smbd_request *request)
 256{
 257        return (void *)request->packet;
 258}
 259
 260static inline void *smbd_response_payload(struct smbd_response *response)
 261{
 262        return (void *)response->packet;
 263}
 264
 265/* Called when a RDMA send is done */
 266static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 267{
 268        int i;
 269        struct smbd_request *request =
 270                container_of(wc->wr_cqe, struct smbd_request, cqe);
 271
 272        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 273                request, wc->status);
 274
 275        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 276                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 277                        wc->status, wc->opcode);
 278                smbd_disconnect_rdma_connection(request->info);
 279        }
 280
 281        for (i = 0; i < request->num_sge; i++)
 282                ib_dma_unmap_single(request->info->id->device,
 283                        request->sge[i].addr,
 284                        request->sge[i].length,
 285                        DMA_TO_DEVICE);
 286
 287        if (request->has_payload) {
 288                if (atomic_dec_and_test(&request->info->send_payload_pending))
 289                        wake_up(&request->info->wait_send_payload_pending);
 290        } else {
 291                if (atomic_dec_and_test(&request->info->send_pending))
 292                        wake_up(&request->info->wait_send_pending);
 293        }
 294
 295        mempool_free(request, request->info->request_mempool);
 296}
 297
 298static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 299{
 300        log_rdma_event(INFO, "resp message min_version %u max_version %u "
 301                "negotiated_version %u credits_requested %u "
 302                "credits_granted %u status %u max_readwrite_size %u "
 303                "preferred_send_size %u max_receive_size %u "
 304                "max_fragmented_size %u\n",
 305                resp->min_version, resp->max_version, resp->negotiated_version,
 306                resp->credits_requested, resp->credits_granted, resp->status,
 307                resp->max_readwrite_size, resp->preferred_send_size,
 308                resp->max_receive_size, resp->max_fragmented_size);
 309}
 310
 311/*
 312 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 313 * response, packet_length: the negotiation response message
 314 * return value: true if negotiation is a success, false if failed
 315 */
 316static bool process_negotiation_response(
 317                struct smbd_response *response, int packet_length)
 318{
 319        struct smbd_connection *info = response->info;
 320        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 321
 322        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 323                log_rdma_event(ERR,
 324                        "error: packet_length=%d\n", packet_length);
 325                return false;
 326        }
 327
 328        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 329                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 330                        le16_to_cpu(packet->negotiated_version));
 331                return false;
 332        }
 333        info->protocol = le16_to_cpu(packet->negotiated_version);
 334
 335        if (packet->credits_requested == 0) {
 336                log_rdma_event(ERR, "error: credits_requested==0\n");
 337                return false;
 338        }
 339        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 340
 341        if (packet->credits_granted == 0) {
 342                log_rdma_event(ERR, "error: credits_granted==0\n");
 343                return false;
 344        }
 345        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 346
 347        atomic_set(&info->receive_credits, 0);
 348
 349        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 350                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 351                        le32_to_cpu(packet->preferred_send_size));
 352                return false;
 353        }
 354        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 355
 356        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 357                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 358                        le32_to_cpu(packet->max_receive_size));
 359                return false;
 360        }
 361        info->max_send_size = min_t(int, info->max_send_size,
 362                                        le32_to_cpu(packet->max_receive_size));
 363
 364        if (le32_to_cpu(packet->max_fragmented_size) <
 365                        SMBD_MIN_FRAGMENTED_SIZE) {
 366                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 367                        le32_to_cpu(packet->max_fragmented_size));
 368                return false;
 369        }
 370        info->max_fragmented_send_size =
 371                le32_to_cpu(packet->max_fragmented_size);
 372        info->rdma_readwrite_threshold =
 373                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 374                info->max_fragmented_send_size :
 375                rdma_readwrite_threshold;
 376
 377
 378        info->max_readwrite_size = min_t(u32,
 379                        le32_to_cpu(packet->max_readwrite_size),
 380                        info->max_frmr_depth * PAGE_SIZE);
 381        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 382
 383        return true;
 384}
 385
 386/*
 387 * Check and schedule to send an immediate packet
 388 * This is used to extend credtis to remote peer to keep the transport busy
 389 */
 390static void check_and_send_immediate(struct smbd_connection *info)
 391{
 392        if (info->transport_status != SMBD_CONNECTED)
 393                return;
 394
 395        info->send_immediate = true;
 396
 397        /*
 398         * Promptly send a packet if our peer is running low on receive
 399         * credits
 400         */
 401        if (atomic_read(&info->receive_credits) <
 402                info->receive_credit_target - 1)
 403                queue_delayed_work(
 404                        info->workqueue, &info->send_immediate_work, 0);
 405}
 406
 407static void smbd_post_send_credits(struct work_struct *work)
 408{
 409        int ret = 0;
 410        int use_receive_queue = 1;
 411        int rc;
 412        struct smbd_response *response;
 413        struct smbd_connection *info =
 414                container_of(work, struct smbd_connection,
 415                        post_send_credits_work);
 416
 417        if (info->transport_status != SMBD_CONNECTED) {
 418                wake_up(&info->wait_receive_queues);
 419                return;
 420        }
 421
 422        if (info->receive_credit_target >
 423                atomic_read(&info->receive_credits)) {
 424                while (true) {
 425                        if (use_receive_queue)
 426                                response = get_receive_buffer(info);
 427                        else
 428                                response = get_empty_queue_buffer(info);
 429                        if (!response) {
 430                                /* now switch to emtpy packet queue */
 431                                if (use_receive_queue) {
 432                                        use_receive_queue = 0;
 433                                        continue;
 434                                } else
 435                                        break;
 436                        }
 437
 438                        response->type = SMBD_TRANSFER_DATA;
 439                        response->first_segment = false;
 440                        rc = smbd_post_recv(info, response);
 441                        if (rc) {
 442                                log_rdma_recv(ERR,
 443                                        "post_recv failed rc=%d\n", rc);
 444                                put_receive_buffer(info, response);
 445                                break;
 446                        }
 447
 448                        ret++;
 449                }
 450        }
 451
 452        spin_lock(&info->lock_new_credits_offered);
 453        info->new_credits_offered += ret;
 454        spin_unlock(&info->lock_new_credits_offered);
 455
 456        atomic_add(ret, &info->receive_credits);
 457
 458        /* Check if we can post new receive and grant credits to peer */
 459        check_and_send_immediate(info);
 460}
 461
 462static void smbd_recv_done_work(struct work_struct *work)
 463{
 464        struct smbd_connection *info =
 465                container_of(work, struct smbd_connection, recv_done_work);
 466
 467        /*
 468         * We may have new send credits granted from remote peer
 469         * If any sender is blcoked on lack of credets, unblock it
 470         */
 471        if (atomic_read(&info->send_credits))
 472                wake_up_interruptible(&info->wait_send_queue);
 473
 474        /*
 475         * Check if we need to send something to remote peer to
 476         * grant more credits or respond to KEEP_ALIVE packet
 477         */
 478        check_and_send_immediate(info);
 479}
 480
 481/* Called from softirq, when recv is done */
 482static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 483{
 484        struct smbd_data_transfer *data_transfer;
 485        struct smbd_response *response =
 486                container_of(wc->wr_cqe, struct smbd_response, cqe);
 487        struct smbd_connection *info = response->info;
 488        int data_length = 0;
 489
 490        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d "
 491                      "byte_len=%d pkey_index=%x\n",
 492                response, response->type, wc->status, wc->opcode,
 493                wc->byte_len, wc->pkey_index);
 494
 495        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 496                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 497                        wc->status, wc->opcode);
 498                smbd_disconnect_rdma_connection(info);
 499                goto error;
 500        }
 501
 502        ib_dma_sync_single_for_cpu(
 503                wc->qp->device,
 504                response->sge.addr,
 505                response->sge.length,
 506                DMA_FROM_DEVICE);
 507
 508        switch (response->type) {
 509        /* SMBD negotiation response */
 510        case SMBD_NEGOTIATE_RESP:
 511                dump_smbd_negotiate_resp(smbd_response_payload(response));
 512                info->full_packet_received = true;
 513                info->negotiate_done =
 514                        process_negotiation_response(response, wc->byte_len);
 515                complete(&info->negotiate_completion);
 516                break;
 517
 518        /* SMBD data transfer packet */
 519        case SMBD_TRANSFER_DATA:
 520                data_transfer = smbd_response_payload(response);
 521                data_length = le32_to_cpu(data_transfer->data_length);
 522
 523                /*
 524                 * If this is a packet with data playload place the data in
 525                 * reassembly queue and wake up the reading thread
 526                 */
 527                if (data_length) {
 528                        if (info->full_packet_received)
 529                                response->first_segment = true;
 530
 531                        if (le32_to_cpu(data_transfer->remaining_data_length))
 532                                info->full_packet_received = false;
 533                        else
 534                                info->full_packet_received = true;
 535
 536                        enqueue_reassembly(
 537                                info,
 538                                response,
 539                                data_length);
 540                } else
 541                        put_empty_packet(info, response);
 542
 543                if (data_length)
 544                        wake_up_interruptible(&info->wait_reassembly_queue);
 545
 546                atomic_dec(&info->receive_credits);
 547                info->receive_credit_target =
 548                        le16_to_cpu(data_transfer->credits_requested);
 549                atomic_add(le16_to_cpu(data_transfer->credits_granted),
 550                        &info->send_credits);
 551
 552                log_incoming(INFO, "data flags %d data_offset %d "
 553                        "data_length %d remaining_data_length %d\n",
 554                        le16_to_cpu(data_transfer->flags),
 555                        le32_to_cpu(data_transfer->data_offset),
 556                        le32_to_cpu(data_transfer->data_length),
 557                        le32_to_cpu(data_transfer->remaining_data_length));
 558
 559                /* Send a KEEP_ALIVE response right away if requested */
 560                info->keep_alive_requested = KEEP_ALIVE_NONE;
 561                if (le16_to_cpu(data_transfer->flags) &
 562                                SMB_DIRECT_RESPONSE_REQUESTED) {
 563                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 564                }
 565
 566                queue_work(info->workqueue, &info->recv_done_work);
 567                return;
 568
 569        default:
 570                log_rdma_recv(ERR,
 571                        "unexpected response type=%d\n", response->type);
 572        }
 573
 574error:
 575        put_receive_buffer(info, response);
 576}
 577
 578static struct rdma_cm_id *smbd_create_id(
 579                struct smbd_connection *info,
 580                struct sockaddr *dstaddr, int port)
 581{
 582        struct rdma_cm_id *id;
 583        int rc;
 584        __be16 *sport;
 585
 586        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 587                RDMA_PS_TCP, IB_QPT_RC);
 588        if (IS_ERR(id)) {
 589                rc = PTR_ERR(id);
 590                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 591                return id;
 592        }
 593
 594        if (dstaddr->sa_family == AF_INET6)
 595                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 596        else
 597                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 598
 599        *sport = htons(port);
 600
 601        init_completion(&info->ri_done);
 602        info->ri_rc = -ETIMEDOUT;
 603
 604        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 605                RDMA_RESOLVE_TIMEOUT);
 606        if (rc) {
 607                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 608                goto out;
 609        }
 610        wait_for_completion_interruptible_timeout(
 611                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 612        rc = info->ri_rc;
 613        if (rc) {
 614                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 615                goto out;
 616        }
 617
 618        info->ri_rc = -ETIMEDOUT;
 619        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 620        if (rc) {
 621                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 622                goto out;
 623        }
 624        wait_for_completion_interruptible_timeout(
 625                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 626        rc = info->ri_rc;
 627        if (rc) {
 628                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 629                goto out;
 630        }
 631
 632        return id;
 633
 634out:
 635        rdma_destroy_id(id);
 636        return ERR_PTR(rc);
 637}
 638
 639/*
 640 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 641 * This implementation requries FRWR on RDMA read/write
 642 * return value: true if it is supported
 643 */
 644static bool frwr_is_supported(struct ib_device_attr *attrs)
 645{
 646        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 647                return false;
 648        if (attrs->max_fast_reg_page_list_len == 0)
 649                return false;
 650        return true;
 651}
 652
 653static int smbd_ia_open(
 654                struct smbd_connection *info,
 655                struct sockaddr *dstaddr, int port)
 656{
 657        int rc;
 658
 659        info->id = smbd_create_id(info, dstaddr, port);
 660        if (IS_ERR(info->id)) {
 661                rc = PTR_ERR(info->id);
 662                goto out1;
 663        }
 664
 665        if (!frwr_is_supported(&info->id->device->attrs)) {
 666                log_rdma_event(ERR,
 667                        "Fast Registration Work Requests "
 668                        "(FRWR) is not supported\n");
 669                log_rdma_event(ERR,
 670                        "Device capability flags = %llx "
 671                        "max_fast_reg_page_list_len = %u\n",
 672                        info->id->device->attrs.device_cap_flags,
 673                        info->id->device->attrs.max_fast_reg_page_list_len);
 674                rc = -EPROTONOSUPPORT;
 675                goto out2;
 676        }
 677        info->max_frmr_depth = min_t(int,
 678                smbd_max_frmr_depth,
 679                info->id->device->attrs.max_fast_reg_page_list_len);
 680        info->mr_type = IB_MR_TYPE_MEM_REG;
 681        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 682                info->mr_type = IB_MR_TYPE_SG_GAPS;
 683
 684        info->pd = ib_alloc_pd(info->id->device, 0);
 685        if (IS_ERR(info->pd)) {
 686                rc = PTR_ERR(info->pd);
 687                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 688                goto out2;
 689        }
 690
 691        return 0;
 692
 693out2:
 694        rdma_destroy_id(info->id);
 695        info->id = NULL;
 696
 697out1:
 698        return rc;
 699}
 700
 701/*
 702 * Send a negotiation request message to the peer
 703 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 704 * After negotiation, the transport is connected and ready for
 705 * carrying upper layer SMB payload
 706 */
 707static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 708{
 709        struct ib_send_wr send_wr;
 710        int rc = -ENOMEM;
 711        struct smbd_request *request;
 712        struct smbd_negotiate_req *packet;
 713
 714        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 715        if (!request)
 716                return rc;
 717
 718        request->info = info;
 719
 720        packet = smbd_request_payload(request);
 721        packet->min_version = cpu_to_le16(SMBD_V1);
 722        packet->max_version = cpu_to_le16(SMBD_V1);
 723        packet->reserved = 0;
 724        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 725        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 726        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 727        packet->max_fragmented_size =
 728                cpu_to_le32(info->max_fragmented_recv_size);
 729
 730        request->num_sge = 1;
 731        request->sge[0].addr = ib_dma_map_single(
 732                                info->id->device, (void *)packet,
 733                                sizeof(*packet), DMA_TO_DEVICE);
 734        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 735                rc = -EIO;
 736                goto dma_mapping_failed;
 737        }
 738
 739        request->sge[0].length = sizeof(*packet);
 740        request->sge[0].lkey = info->pd->local_dma_lkey;
 741
 742        ib_dma_sync_single_for_device(
 743                info->id->device, request->sge[0].addr,
 744                request->sge[0].length, DMA_TO_DEVICE);
 745
 746        request->cqe.done = send_done;
 747
 748        send_wr.next = NULL;
 749        send_wr.wr_cqe = &request->cqe;
 750        send_wr.sg_list = request->sge;
 751        send_wr.num_sge = request->num_sge;
 752        send_wr.opcode = IB_WR_SEND;
 753        send_wr.send_flags = IB_SEND_SIGNALED;
 754
 755        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 756                request->sge[0].addr,
 757                request->sge[0].length, request->sge[0].lkey);
 758
 759        request->has_payload = false;
 760        atomic_inc(&info->send_pending);
 761        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 762        if (!rc)
 763                return 0;
 764
 765        /* if we reach here, post send failed */
 766        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 767        atomic_dec(&info->send_pending);
 768        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 769                request->sge[0].length, DMA_TO_DEVICE);
 770
 771        smbd_disconnect_rdma_connection(info);
 772
 773dma_mapping_failed:
 774        mempool_free(request, info->request_mempool);
 775        return rc;
 776}
 777
 778/*
 779 * Extend the credits to remote peer
 780 * This implements [MS-SMBD] 3.1.5.9
 781 * The idea is that we should extend credits to remote peer as quickly as
 782 * it's allowed, to maintain data flow. We allocate as much receive
 783 * buffer as possible, and extend the receive credits to remote peer
 784 * return value: the new credtis being granted.
 785 */
 786static int manage_credits_prior_sending(struct smbd_connection *info)
 787{
 788        int new_credits;
 789
 790        spin_lock(&info->lock_new_credits_offered);
 791        new_credits = info->new_credits_offered;
 792        info->new_credits_offered = 0;
 793        spin_unlock(&info->lock_new_credits_offered);
 794
 795        return new_credits;
 796}
 797
 798/*
 799 * Check if we need to send a KEEP_ALIVE message
 800 * The idle connection timer triggers a KEEP_ALIVE message when expires
 801 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 802 * back a response.
 803 * return value:
 804 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 805 * 0: otherwise
 806 */
 807static int manage_keep_alive_before_sending(struct smbd_connection *info)
 808{
 809        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 810                info->keep_alive_requested = KEEP_ALIVE_SENT;
 811                return 1;
 812        }
 813        return 0;
 814}
 815
 816/*
 817 * Build and prepare the SMBD packet header
 818 * This function waits for avaialbe send credits and build a SMBD packet
 819 * header. The caller then optional append payload to the packet after
 820 * the header
 821 * intput values
 822 * size: the size of the payload
 823 * remaining_data_length: remaining data to send if this is part of a
 824 * fragmented packet
 825 * output values
 826 * request_out: the request allocated from this function
 827 * return values: 0 on success, otherwise actual error code returned
 828 */
 829static int smbd_create_header(struct smbd_connection *info,
 830                int size, int remaining_data_length,
 831                struct smbd_request **request_out)
 832{
 833        struct smbd_request *request;
 834        struct smbd_data_transfer *packet;
 835        int header_length;
 836        int rc;
 837
 838        /* Wait for send credits. A SMBD packet needs one credit */
 839        rc = wait_event_interruptible(info->wait_send_queue,
 840                atomic_read(&info->send_credits) > 0 ||
 841                info->transport_status != SMBD_CONNECTED);
 842        if (rc)
 843                return rc;
 844
 845        if (info->transport_status != SMBD_CONNECTED) {
 846                log_outgoing(ERR, "disconnected not sending\n");
 847                return -EAGAIN;
 848        }
 849        atomic_dec(&info->send_credits);
 850
 851        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 852        if (!request) {
 853                rc = -ENOMEM;
 854                goto err;
 855        }
 856
 857        request->info = info;
 858
 859        /* Fill in the packet header */
 860        packet = smbd_request_payload(request);
 861        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 862        packet->credits_granted =
 863                cpu_to_le16(manage_credits_prior_sending(info));
 864        info->send_immediate = false;
 865
 866        packet->flags = 0;
 867        if (manage_keep_alive_before_sending(info))
 868                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 869
 870        packet->reserved = 0;
 871        if (!size)
 872                packet->data_offset = 0;
 873        else
 874                packet->data_offset = cpu_to_le32(24);
 875        packet->data_length = cpu_to_le32(size);
 876        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 877        packet->padding = 0;
 878
 879        log_outgoing(INFO, "credits_requested=%d credits_granted=%d "
 880                "data_offset=%d data_length=%d remaining_data_length=%d\n",
 881                le16_to_cpu(packet->credits_requested),
 882                le16_to_cpu(packet->credits_granted),
 883                le32_to_cpu(packet->data_offset),
 884                le32_to_cpu(packet->data_length),
 885                le32_to_cpu(packet->remaining_data_length));
 886
 887        /* Map the packet to DMA */
 888        header_length = sizeof(struct smbd_data_transfer);
 889        /* If this is a packet without payload, don't send padding */
 890        if (!size)
 891                header_length = offsetof(struct smbd_data_transfer, padding);
 892
 893        request->num_sge = 1;
 894        request->sge[0].addr = ib_dma_map_single(info->id->device,
 895                                                 (void *)packet,
 896                                                 header_length,
 897                                                 DMA_TO_DEVICE);
 898        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 899                mempool_free(request, info->request_mempool);
 900                rc = -EIO;
 901                goto err;
 902        }
 903
 904        request->sge[0].length = header_length;
 905        request->sge[0].lkey = info->pd->local_dma_lkey;
 906
 907        *request_out = request;
 908        return 0;
 909
 910err:
 911        atomic_inc(&info->send_credits);
 912        return rc;
 913}
 914
 915static void smbd_destroy_header(struct smbd_connection *info,
 916                struct smbd_request *request)
 917{
 918
 919        ib_dma_unmap_single(info->id->device,
 920                            request->sge[0].addr,
 921                            request->sge[0].length,
 922                            DMA_TO_DEVICE);
 923        mempool_free(request, info->request_mempool);
 924        atomic_inc(&info->send_credits);
 925}
 926
 927/* Post the send request */
 928static int smbd_post_send(struct smbd_connection *info,
 929                struct smbd_request *request, bool has_payload)
 930{
 931        struct ib_send_wr send_wr;
 932        int rc, i;
 933
 934        for (i = 0; i < request->num_sge; i++) {
 935                log_rdma_send(INFO,
 936                        "rdma_request sge[%d] addr=%llu length=%u\n",
 937                        i, request->sge[i].addr, request->sge[i].length);
 938                ib_dma_sync_single_for_device(
 939                        info->id->device,
 940                        request->sge[i].addr,
 941                        request->sge[i].length,
 942                        DMA_TO_DEVICE);
 943        }
 944
 945        request->cqe.done = send_done;
 946
 947        send_wr.next = NULL;
 948        send_wr.wr_cqe = &request->cqe;
 949        send_wr.sg_list = request->sge;
 950        send_wr.num_sge = request->num_sge;
 951        send_wr.opcode = IB_WR_SEND;
 952        send_wr.send_flags = IB_SEND_SIGNALED;
 953
 954        if (has_payload) {
 955                request->has_payload = true;
 956                atomic_inc(&info->send_payload_pending);
 957        } else {
 958                request->has_payload = false;
 959                atomic_inc(&info->send_pending);
 960        }
 961
 962        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 963        if (rc) {
 964                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 965                if (has_payload) {
 966                        if (atomic_dec_and_test(&info->send_payload_pending))
 967                                wake_up(&info->wait_send_payload_pending);
 968                } else {
 969                        if (atomic_dec_and_test(&info->send_pending))
 970                                wake_up(&info->wait_send_pending);
 971                }
 972                smbd_disconnect_rdma_connection(info);
 973                rc = -EAGAIN;
 974        } else
 975                /* Reset timer for idle connection after packet is sent */
 976                mod_delayed_work(info->workqueue, &info->idle_timer_work,
 977                        info->keep_alive_interval*HZ);
 978
 979        return rc;
 980}
 981
 982static int smbd_post_send_sgl(struct smbd_connection *info,
 983        struct scatterlist *sgl, int data_length, int remaining_data_length)
 984{
 985        int num_sgs;
 986        int i, rc;
 987        struct smbd_request *request;
 988        struct scatterlist *sg;
 989
 990        rc = smbd_create_header(
 991                info, data_length, remaining_data_length, &request);
 992        if (rc)
 993                return rc;
 994
 995        num_sgs = sgl ? sg_nents(sgl) : 0;
 996        for_each_sg(sgl, sg, num_sgs, i) {
 997                request->sge[i+1].addr =
 998                        ib_dma_map_page(info->id->device, sg_page(sg),
 999                               sg->offset, sg->length, DMA_TO_DEVICE);
1000                if (ib_dma_mapping_error(
1001                                info->id->device, request->sge[i+1].addr)) {
1002                        rc = -EIO;
1003                        request->sge[i+1].addr = 0;
1004                        goto dma_mapping_failure;
1005                }
1006                request->sge[i+1].length = sg->length;
1007                request->sge[i+1].lkey = info->pd->local_dma_lkey;
1008                request->num_sge++;
1009        }
1010
1011        rc = smbd_post_send(info, request, data_length);
1012        if (!rc)
1013                return 0;
1014
1015dma_mapping_failure:
1016        for (i = 1; i < request->num_sge; i++)
1017                if (request->sge[i].addr)
1018                        ib_dma_unmap_single(info->id->device,
1019                                            request->sge[i].addr,
1020                                            request->sge[i].length,
1021                                            DMA_TO_DEVICE);
1022        smbd_destroy_header(info, request);
1023        return rc;
1024}
1025
1026/*
1027 * Send a page
1028 * page: the page to send
1029 * offset: offset in the page to send
1030 * size: length in the page to send
1031 * remaining_data_length: remaining data to send in this payload
1032 */
1033static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
1034                unsigned long offset, size_t size, int remaining_data_length)
1035{
1036        struct scatterlist sgl;
1037
1038        sg_init_table(&sgl, 1);
1039        sg_set_page(&sgl, page, size, offset);
1040
1041        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
1042}
1043
1044/*
1045 * Send an empty message
1046 * Empty message is used to extend credits to peer to for keep live
1047 * while there is no upper layer payload to send at the time
1048 */
1049static int smbd_post_send_empty(struct smbd_connection *info)
1050{
1051        info->count_send_empty++;
1052        return smbd_post_send_sgl(info, NULL, 0, 0);
1053}
1054
1055/*
1056 * Send a data buffer
1057 * iov: the iov array describing the data buffers
1058 * n_vec: number of iov array
1059 * remaining_data_length: remaining data to send following this packet
1060 * in segmented SMBD packet
1061 */
1062static int smbd_post_send_data(
1063        struct smbd_connection *info, struct kvec *iov, int n_vec,
1064        int remaining_data_length)
1065{
1066        int i;
1067        u32 data_length = 0;
1068        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1069
1070        if (n_vec > SMBDIRECT_MAX_SGE) {
1071                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1072                return -ENOMEM;
1073        }
1074
1075        sg_init_table(sgl, n_vec);
1076        for (i = 0; i < n_vec; i++) {
1077                data_length += iov[i].iov_len;
1078                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1079        }
1080
1081        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1082}
1083
1084/*
1085 * Post a receive request to the transport
1086 * The remote peer can only send data when a receive request is posted
1087 * The interaction is controlled by send/receive credit system
1088 */
1089static int smbd_post_recv(
1090                struct smbd_connection *info, struct smbd_response *response)
1091{
1092        struct ib_recv_wr recv_wr;
1093        int rc = -EIO;
1094
1095        response->sge.addr = ib_dma_map_single(
1096                                info->id->device, response->packet,
1097                                info->max_receive_size, DMA_FROM_DEVICE);
1098        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1099                return rc;
1100
1101        response->sge.length = info->max_receive_size;
1102        response->sge.lkey = info->pd->local_dma_lkey;
1103
1104        response->cqe.done = recv_done;
1105
1106        recv_wr.wr_cqe = &response->cqe;
1107        recv_wr.next = NULL;
1108        recv_wr.sg_list = &response->sge;
1109        recv_wr.num_sge = 1;
1110
1111        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1112        if (rc) {
1113                ib_dma_unmap_single(info->id->device, response->sge.addr,
1114                                    response->sge.length, DMA_FROM_DEVICE);
1115                smbd_disconnect_rdma_connection(info);
1116                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1117        }
1118
1119        return rc;
1120}
1121
1122/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1123static int smbd_negotiate(struct smbd_connection *info)
1124{
1125        int rc;
1126        struct smbd_response *response = get_receive_buffer(info);
1127
1128        response->type = SMBD_NEGOTIATE_RESP;
1129        rc = smbd_post_recv(info, response);
1130        log_rdma_event(INFO,
1131                "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x "
1132                "iov.lkey=%x\n",
1133                rc, response->sge.addr,
1134                response->sge.length, response->sge.lkey);
1135        if (rc)
1136                return rc;
1137
1138        init_completion(&info->negotiate_completion);
1139        info->negotiate_done = false;
1140        rc = smbd_post_send_negotiate_req(info);
1141        if (rc)
1142                return rc;
1143
1144        rc = wait_for_completion_interruptible_timeout(
1145                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1146        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1147
1148        if (info->negotiate_done)
1149                return 0;
1150
1151        if (rc == 0)
1152                rc = -ETIMEDOUT;
1153        else if (rc == -ERESTARTSYS)
1154                rc = -EINTR;
1155        else
1156                rc = -ENOTCONN;
1157
1158        return rc;
1159}
1160
1161static void put_empty_packet(
1162                struct smbd_connection *info, struct smbd_response *response)
1163{
1164        spin_lock(&info->empty_packet_queue_lock);
1165        list_add_tail(&response->list, &info->empty_packet_queue);
1166        info->count_empty_packet_queue++;
1167        spin_unlock(&info->empty_packet_queue_lock);
1168
1169        queue_work(info->workqueue, &info->post_send_credits_work);
1170}
1171
1172/*
1173 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1174 * This is a queue for reassembling upper layer payload and present to upper
1175 * layer. All the inncoming payload go to the reassembly queue, regardless of
1176 * if reassembly is required. The uuper layer code reads from the queue for all
1177 * incoming payloads.
1178 * Put a received packet to the reassembly queue
1179 * response: the packet received
1180 * data_length: the size of payload in this packet
1181 */
1182static void enqueue_reassembly(
1183        struct smbd_connection *info,
1184        struct smbd_response *response,
1185        int data_length)
1186{
1187        spin_lock(&info->reassembly_queue_lock);
1188        list_add_tail(&response->list, &info->reassembly_queue);
1189        info->reassembly_queue_length++;
1190        /*
1191         * Make sure reassembly_data_length is updated after list and
1192         * reassembly_queue_length are updated. On the dequeue side
1193         * reassembly_data_length is checked without a lock to determine
1194         * if reassembly_queue_length and list is up to date
1195         */
1196        virt_wmb();
1197        info->reassembly_data_length += data_length;
1198        spin_unlock(&info->reassembly_queue_lock);
1199        info->count_reassembly_queue++;
1200        info->count_enqueue_reassembly_queue++;
1201}
1202
1203/*
1204 * Get the first entry at the front of reassembly queue
1205 * Caller is responsible for locking
1206 * return value: the first entry if any, NULL if queue is empty
1207 */
1208static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1209{
1210        struct smbd_response *ret = NULL;
1211
1212        if (!list_empty(&info->reassembly_queue)) {
1213                ret = list_first_entry(
1214                        &info->reassembly_queue,
1215                        struct smbd_response, list);
1216        }
1217        return ret;
1218}
1219
1220static struct smbd_response *get_empty_queue_buffer(
1221                struct smbd_connection *info)
1222{
1223        struct smbd_response *ret = NULL;
1224        unsigned long flags;
1225
1226        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1227        if (!list_empty(&info->empty_packet_queue)) {
1228                ret = list_first_entry(
1229                        &info->empty_packet_queue,
1230                        struct smbd_response, list);
1231                list_del(&ret->list);
1232                info->count_empty_packet_queue--;
1233        }
1234        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1235
1236        return ret;
1237}
1238
1239/*
1240 * Get a receive buffer
1241 * For each remote send, we need to post a receive. The receive buffers are
1242 * pre-allocated in advance.
1243 * return value: the receive buffer, NULL if none is available
1244 */
1245static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1246{
1247        struct smbd_response *ret = NULL;
1248        unsigned long flags;
1249
1250        spin_lock_irqsave(&info->receive_queue_lock, flags);
1251        if (!list_empty(&info->receive_queue)) {
1252                ret = list_first_entry(
1253                        &info->receive_queue,
1254                        struct smbd_response, list);
1255                list_del(&ret->list);
1256                info->count_receive_queue--;
1257                info->count_get_receive_buffer++;
1258        }
1259        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1260
1261        return ret;
1262}
1263
1264/*
1265 * Return a receive buffer
1266 * Upon returning of a receive buffer, we can post new receive and extend
1267 * more receive credits to remote peer. This is done immediately after a
1268 * receive buffer is returned.
1269 */
1270static void put_receive_buffer(
1271        struct smbd_connection *info, struct smbd_response *response)
1272{
1273        unsigned long flags;
1274
1275        ib_dma_unmap_single(info->id->device, response->sge.addr,
1276                response->sge.length, DMA_FROM_DEVICE);
1277
1278        spin_lock_irqsave(&info->receive_queue_lock, flags);
1279        list_add_tail(&response->list, &info->receive_queue);
1280        info->count_receive_queue++;
1281        info->count_put_receive_buffer++;
1282        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1283
1284        queue_work(info->workqueue, &info->post_send_credits_work);
1285}
1286
1287/* Preallocate all receive buffer on transport establishment */
1288static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1289{
1290        int i;
1291        struct smbd_response *response;
1292
1293        INIT_LIST_HEAD(&info->reassembly_queue);
1294        spin_lock_init(&info->reassembly_queue_lock);
1295        info->reassembly_data_length = 0;
1296        info->reassembly_queue_length = 0;
1297
1298        INIT_LIST_HEAD(&info->receive_queue);
1299        spin_lock_init(&info->receive_queue_lock);
1300        info->count_receive_queue = 0;
1301
1302        INIT_LIST_HEAD(&info->empty_packet_queue);
1303        spin_lock_init(&info->empty_packet_queue_lock);
1304        info->count_empty_packet_queue = 0;
1305
1306        init_waitqueue_head(&info->wait_receive_queues);
1307
1308        for (i = 0; i < num_buf; i++) {
1309                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1310                if (!response)
1311                        goto allocate_failed;
1312
1313                response->info = info;
1314                list_add_tail(&response->list, &info->receive_queue);
1315                info->count_receive_queue++;
1316        }
1317
1318        return 0;
1319
1320allocate_failed:
1321        while (!list_empty(&info->receive_queue)) {
1322                response = list_first_entry(
1323                                &info->receive_queue,
1324                                struct smbd_response, list);
1325                list_del(&response->list);
1326                info->count_receive_queue--;
1327
1328                mempool_free(response, info->response_mempool);
1329        }
1330        return -ENOMEM;
1331}
1332
1333static void destroy_receive_buffers(struct smbd_connection *info)
1334{
1335        struct smbd_response *response;
1336
1337        while ((response = get_receive_buffer(info)))
1338                mempool_free(response, info->response_mempool);
1339
1340        while ((response = get_empty_queue_buffer(info)))
1341                mempool_free(response, info->response_mempool);
1342}
1343
1344/*
1345 * Check and send an immediate or keep alive packet
1346 * The condition to send those packets are defined in [MS-SMBD] 3.1.1.1
1347 * Connection.KeepaliveRequested and Connection.SendImmediate
1348 * The idea is to extend credits to server as soon as it becomes available
1349 */
1350static void send_immediate_work(struct work_struct *work)
1351{
1352        struct smbd_connection *info = container_of(
1353                                        work, struct smbd_connection,
1354                                        send_immediate_work.work);
1355
1356        if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
1357            info->send_immediate) {
1358                log_keep_alive(INFO, "send an empty message\n");
1359                smbd_post_send_empty(info);
1360        }
1361}
1362
1363/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1364static void idle_connection_timer(struct work_struct *work)
1365{
1366        struct smbd_connection *info = container_of(
1367                                        work, struct smbd_connection,
1368                                        idle_timer_work.work);
1369
1370        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1371                log_keep_alive(ERR,
1372                        "error status info->keep_alive_requested=%d\n",
1373                        info->keep_alive_requested);
1374                smbd_disconnect_rdma_connection(info);
1375                return;
1376        }
1377
1378        log_keep_alive(INFO, "about to send an empty idle message\n");
1379        smbd_post_send_empty(info);
1380
1381        /* Setup the next idle timeout work */
1382        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1383                        info->keep_alive_interval*HZ);
1384}
1385
1386/*
1387 * Destroy the transport and related RDMA and memory resources
1388 * Need to go through all the pending counters and make sure on one is using
1389 * the transport while it is destroyed
1390 */
1391void smbd_destroy(struct TCP_Server_Info *server)
1392{
1393        struct smbd_connection *info = server->smbd_conn;
1394        struct smbd_response *response;
1395        unsigned long flags;
1396
1397        if (!info) {
1398                log_rdma_event(INFO, "rdma session already destroyed\n");
1399                return;
1400        }
1401
1402        log_rdma_event(INFO, "destroying rdma session\n");
1403        if (info->transport_status != SMBD_DISCONNECTED) {
1404                rdma_disconnect(server->smbd_conn->id);
1405                log_rdma_event(INFO, "wait for transport being disconnected\n");
1406                wait_event_interruptible(
1407                        info->disconn_wait,
1408                        info->transport_status == SMBD_DISCONNECTED);
1409        }
1410
1411        log_rdma_event(INFO, "destroying qp\n");
1412        ib_drain_qp(info->id->qp);
1413        rdma_destroy_qp(info->id);
1414
1415        log_rdma_event(INFO, "cancelling idle timer\n");
1416        cancel_delayed_work_sync(&info->idle_timer_work);
1417        log_rdma_event(INFO, "cancelling send immediate work\n");
1418        cancel_delayed_work_sync(&info->send_immediate_work);
1419
1420        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1421        wait_event(info->wait_send_pending,
1422                atomic_read(&info->send_pending) == 0);
1423        wait_event(info->wait_send_payload_pending,
1424                atomic_read(&info->send_payload_pending) == 0);
1425
1426        /* It's not posssible for upper layer to get to reassembly */
1427        log_rdma_event(INFO, "drain the reassembly queue\n");
1428        do {
1429                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1430                response = _get_first_reassembly(info);
1431                if (response) {
1432                        list_del(&response->list);
1433                        spin_unlock_irqrestore(
1434                                &info->reassembly_queue_lock, flags);
1435                        put_receive_buffer(info, response);
1436                } else
1437                        spin_unlock_irqrestore(
1438                                &info->reassembly_queue_lock, flags);
1439        } while (response);
1440        info->reassembly_data_length = 0;
1441
1442        log_rdma_event(INFO, "free receive buffers\n");
1443        wait_event(info->wait_receive_queues,
1444                info->count_receive_queue + info->count_empty_packet_queue
1445                        == info->receive_credit_max);
1446        destroy_receive_buffers(info);
1447
1448        /*
1449         * For performance reasons, memory registration and deregistration
1450         * are not locked by srv_mutex. It is possible some processes are
1451         * blocked on transport srv_mutex while holding memory registration.
1452         * Release the transport srv_mutex to allow them to hit the failure
1453         * path when sending data, and then release memory registartions.
1454         */
1455        log_rdma_event(INFO, "freeing mr list\n");
1456        wake_up_interruptible_all(&info->wait_mr);
1457        while (atomic_read(&info->mr_used_count)) {
1458                mutex_unlock(&server->srv_mutex);
1459                msleep(1000);
1460                mutex_lock(&server->srv_mutex);
1461        }
1462        destroy_mr_list(info);
1463
1464        ib_free_cq(info->send_cq);
1465        ib_free_cq(info->recv_cq);
1466        ib_dealloc_pd(info->pd);
1467        rdma_destroy_id(info->id);
1468
1469        /* free mempools */
1470        mempool_destroy(info->request_mempool);
1471        kmem_cache_destroy(info->request_cache);
1472
1473        mempool_destroy(info->response_mempool);
1474        kmem_cache_destroy(info->response_cache);
1475
1476        info->transport_status = SMBD_DESTROYED;
1477
1478        destroy_workqueue(info->workqueue);
1479        kfree(info);
1480}
1481
1482/*
1483 * Reconnect this SMBD connection, called from upper layer
1484 * return value: 0 on success, or actual error code
1485 */
1486int smbd_reconnect(struct TCP_Server_Info *server)
1487{
1488        log_rdma_event(INFO, "reconnecting rdma session\n");
1489
1490        if (!server->smbd_conn) {
1491                log_rdma_event(INFO, "rdma session already destroyed\n");
1492                goto create_conn;
1493        }
1494
1495        /*
1496         * This is possible if transport is disconnected and we haven't received
1497         * notification from RDMA, but upper layer has detected timeout
1498         */
1499        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1500                log_rdma_event(INFO, "disconnecting transport\n");
1501                smbd_destroy(server);
1502        }
1503
1504create_conn:
1505        log_rdma_event(INFO, "creating rdma session\n");
1506        server->smbd_conn = smbd_get_connection(
1507                server, (struct sockaddr *) &server->dstaddr);
1508        log_rdma_event(INFO, "created rdma session info=%p\n",
1509                server->smbd_conn);
1510
1511        return server->smbd_conn ? 0 : -ENOENT;
1512}
1513
1514static void destroy_caches_and_workqueue(struct smbd_connection *info)
1515{
1516        destroy_receive_buffers(info);
1517        destroy_workqueue(info->workqueue);
1518        mempool_destroy(info->response_mempool);
1519        kmem_cache_destroy(info->response_cache);
1520        mempool_destroy(info->request_mempool);
1521        kmem_cache_destroy(info->request_cache);
1522}
1523
1524#define MAX_NAME_LEN    80
1525static int allocate_caches_and_workqueue(struct smbd_connection *info)
1526{
1527        char name[MAX_NAME_LEN];
1528        int rc;
1529
1530        scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1531        info->request_cache =
1532                kmem_cache_create(
1533                        name,
1534                        sizeof(struct smbd_request) +
1535                                sizeof(struct smbd_data_transfer),
1536                        0, SLAB_HWCACHE_ALIGN, NULL);
1537        if (!info->request_cache)
1538                return -ENOMEM;
1539
1540        info->request_mempool =
1541                mempool_create(info->send_credit_target, mempool_alloc_slab,
1542                        mempool_free_slab, info->request_cache);
1543        if (!info->request_mempool)
1544                goto out1;
1545
1546        scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1547        info->response_cache =
1548                kmem_cache_create(
1549                        name,
1550                        sizeof(struct smbd_response) +
1551                                info->max_receive_size,
1552                        0, SLAB_HWCACHE_ALIGN, NULL);
1553        if (!info->response_cache)
1554                goto out2;
1555
1556        info->response_mempool =
1557                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1558                       mempool_free_slab, info->response_cache);
1559        if (!info->response_mempool)
1560                goto out3;
1561
1562        scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1563        info->workqueue = create_workqueue(name);
1564        if (!info->workqueue)
1565                goto out4;
1566
1567        rc = allocate_receive_buffers(info, info->receive_credit_max);
1568        if (rc) {
1569                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1570                goto out5;
1571        }
1572
1573        return 0;
1574
1575out5:
1576        destroy_workqueue(info->workqueue);
1577out4:
1578        mempool_destroy(info->response_mempool);
1579out3:
1580        kmem_cache_destroy(info->response_cache);
1581out2:
1582        mempool_destroy(info->request_mempool);
1583out1:
1584        kmem_cache_destroy(info->request_cache);
1585        return -ENOMEM;
1586}
1587
1588/* Create a SMBD connection, called by upper layer */
1589static struct smbd_connection *_smbd_get_connection(
1590        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1591{
1592        int rc;
1593        struct smbd_connection *info;
1594        struct rdma_conn_param conn_param;
1595        struct ib_qp_init_attr qp_attr;
1596        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1597        struct ib_port_immutable port_immutable;
1598        u32 ird_ord_hdr[2];
1599
1600        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1601        if (!info)
1602                return NULL;
1603
1604        info->transport_status = SMBD_CONNECTING;
1605        rc = smbd_ia_open(info, dstaddr, port);
1606        if (rc) {
1607                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1608                goto create_id_failed;
1609        }
1610
1611        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1612            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1613                log_rdma_event(ERR,
1614                        "consider lowering send_credit_target = %d. "
1615                        "Possible CQE overrun, device "
1616                        "reporting max_cpe %d max_qp_wr %d\n",
1617                        smbd_send_credit_target,
1618                        info->id->device->attrs.max_cqe,
1619                        info->id->device->attrs.max_qp_wr);
1620                goto config_failed;
1621        }
1622
1623        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1624            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1625                log_rdma_event(ERR,
1626                        "consider lowering receive_credit_max = %d. "
1627                        "Possible CQE overrun, device "
1628                        "reporting max_cpe %d max_qp_wr %d\n",
1629                        smbd_receive_credit_max,
1630                        info->id->device->attrs.max_cqe,
1631                        info->id->device->attrs.max_qp_wr);
1632                goto config_failed;
1633        }
1634
1635        info->receive_credit_max = smbd_receive_credit_max;
1636        info->send_credit_target = smbd_send_credit_target;
1637        info->max_send_size = smbd_max_send_size;
1638        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1639        info->max_receive_size = smbd_max_receive_size;
1640        info->keep_alive_interval = smbd_keep_alive_interval;
1641
1642        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1643                log_rdma_event(ERR,
1644                        "warning: device max_send_sge = %d too small\n",
1645                        info->id->device->attrs.max_send_sge);
1646                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1647        }
1648        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1649                log_rdma_event(ERR,
1650                        "warning: device max_recv_sge = %d too small\n",
1651                        info->id->device->attrs.max_recv_sge);
1652                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1653        }
1654
1655        info->send_cq = NULL;
1656        info->recv_cq = NULL;
1657        info->send_cq = ib_alloc_cq(info->id->device, info,
1658                        info->send_credit_target, 0, IB_POLL_SOFTIRQ);
1659        if (IS_ERR(info->send_cq)) {
1660                info->send_cq = NULL;
1661                goto alloc_cq_failed;
1662        }
1663
1664        info->recv_cq = ib_alloc_cq(info->id->device, info,
1665                        info->receive_credit_max, 0, IB_POLL_SOFTIRQ);
1666        if (IS_ERR(info->recv_cq)) {
1667                info->recv_cq = NULL;
1668                goto alloc_cq_failed;
1669        }
1670
1671        memset(&qp_attr, 0, sizeof(qp_attr));
1672        qp_attr.event_handler = smbd_qp_async_error_upcall;
1673        qp_attr.qp_context = info;
1674        qp_attr.cap.max_send_wr = info->send_credit_target;
1675        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1676        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1677        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1678        qp_attr.cap.max_inline_data = 0;
1679        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1680        qp_attr.qp_type = IB_QPT_RC;
1681        qp_attr.send_cq = info->send_cq;
1682        qp_attr.recv_cq = info->recv_cq;
1683        qp_attr.port_num = ~0;
1684
1685        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1686        if (rc) {
1687                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1688                goto create_qp_failed;
1689        }
1690
1691        memset(&conn_param, 0, sizeof(conn_param));
1692        conn_param.initiator_depth = 0;
1693
1694        conn_param.responder_resources =
1695                info->id->device->attrs.max_qp_rd_atom
1696                        < SMBD_CM_RESPONDER_RESOURCES ?
1697                info->id->device->attrs.max_qp_rd_atom :
1698                SMBD_CM_RESPONDER_RESOURCES;
1699        info->responder_resources = conn_param.responder_resources;
1700        log_rdma_mr(INFO, "responder_resources=%d\n",
1701                info->responder_resources);
1702
1703        /* Need to send IRD/ORD in private data for iWARP */
1704        info->id->device->ops.get_port_immutable(
1705                info->id->device, info->id->port_num, &port_immutable);
1706        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1707                ird_ord_hdr[0] = info->responder_resources;
1708                ird_ord_hdr[1] = 1;
1709                conn_param.private_data = ird_ord_hdr;
1710                conn_param.private_data_len = sizeof(ird_ord_hdr);
1711        } else {
1712                conn_param.private_data = NULL;
1713                conn_param.private_data_len = 0;
1714        }
1715
1716        conn_param.retry_count = SMBD_CM_RETRY;
1717        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1718        conn_param.flow_control = 0;
1719
1720        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1721                &addr_in->sin_addr, port);
1722
1723        init_waitqueue_head(&info->conn_wait);
1724        init_waitqueue_head(&info->disconn_wait);
1725        init_waitqueue_head(&info->wait_reassembly_queue);
1726        rc = rdma_connect(info->id, &conn_param);
1727        if (rc) {
1728                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1729                goto rdma_connect_failed;
1730        }
1731
1732        wait_event_interruptible(
1733                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1734
1735        if (info->transport_status != SMBD_CONNECTED) {
1736                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1737                goto rdma_connect_failed;
1738        }
1739
1740        log_rdma_event(INFO, "rdma_connect connected\n");
1741
1742        rc = allocate_caches_and_workqueue(info);
1743        if (rc) {
1744                log_rdma_event(ERR, "cache allocation failed\n");
1745                goto allocate_cache_failed;
1746        }
1747
1748        init_waitqueue_head(&info->wait_send_queue);
1749        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1750        INIT_DELAYED_WORK(&info->send_immediate_work, send_immediate_work);
1751        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1752                info->keep_alive_interval*HZ);
1753
1754        init_waitqueue_head(&info->wait_send_pending);
1755        atomic_set(&info->send_pending, 0);
1756
1757        init_waitqueue_head(&info->wait_send_payload_pending);
1758        atomic_set(&info->send_payload_pending, 0);
1759
1760        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1761        INIT_WORK(&info->recv_done_work, smbd_recv_done_work);
1762        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1763        info->new_credits_offered = 0;
1764        spin_lock_init(&info->lock_new_credits_offered);
1765
1766        rc = smbd_negotiate(info);
1767        if (rc) {
1768                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1769                goto negotiation_failed;
1770        }
1771
1772        rc = allocate_mr_list(info);
1773        if (rc) {
1774                log_rdma_mr(ERR, "memory registration allocation failed\n");
1775                goto allocate_mr_failed;
1776        }
1777
1778        return info;
1779
1780allocate_mr_failed:
1781        /* At this point, need to a full transport shutdown */
1782        smbd_destroy(server);
1783        return NULL;
1784
1785negotiation_failed:
1786        cancel_delayed_work_sync(&info->idle_timer_work);
1787        destroy_caches_and_workqueue(info);
1788        info->transport_status = SMBD_NEGOTIATE_FAILED;
1789        init_waitqueue_head(&info->conn_wait);
1790        rdma_disconnect(info->id);
1791        wait_event(info->conn_wait,
1792                info->transport_status == SMBD_DISCONNECTED);
1793
1794allocate_cache_failed:
1795rdma_connect_failed:
1796        rdma_destroy_qp(info->id);
1797
1798create_qp_failed:
1799alloc_cq_failed:
1800        if (info->send_cq)
1801                ib_free_cq(info->send_cq);
1802        if (info->recv_cq)
1803                ib_free_cq(info->recv_cq);
1804
1805config_failed:
1806        ib_dealloc_pd(info->pd);
1807        rdma_destroy_id(info->id);
1808
1809create_id_failed:
1810        kfree(info);
1811        return NULL;
1812}
1813
1814struct smbd_connection *smbd_get_connection(
1815        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1816{
1817        struct smbd_connection *ret;
1818        int port = SMBD_PORT;
1819
1820try_again:
1821        ret = _smbd_get_connection(server, dstaddr, port);
1822
1823        /* Try SMB_PORT if SMBD_PORT doesn't work */
1824        if (!ret && port == SMBD_PORT) {
1825                port = SMB_PORT;
1826                goto try_again;
1827        }
1828        return ret;
1829}
1830
1831/*
1832 * Receive data from receive reassembly queue
1833 * All the incoming data packets are placed in reassembly queue
1834 * buf: the buffer to read data into
1835 * size: the length of data to read
1836 * return value: actual data read
1837 * Note: this implementation copies the data from reassebmly queue to receive
1838 * buffers used by upper layer. This is not the optimal code path. A better way
1839 * to do it is to not have upper layer allocate its receive buffers but rather
1840 * borrow the buffer from reassembly queue, and return it after data is
1841 * consumed. But this will require more changes to upper layer code, and also
1842 * need to consider packet boundaries while they still being reassembled.
1843 */
1844static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1845                unsigned int size)
1846{
1847        struct smbd_response *response;
1848        struct smbd_data_transfer *data_transfer;
1849        int to_copy, to_read, data_read, offset;
1850        u32 data_length, remaining_data_length, data_offset;
1851        int rc;
1852
1853again:
1854        /*
1855         * No need to hold the reassembly queue lock all the time as we are
1856         * the only one reading from the front of the queue. The transport
1857         * may add more entries to the back of the queue at the same time
1858         */
1859        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1860                info->reassembly_data_length);
1861        if (info->reassembly_data_length >= size) {
1862                int queue_length;
1863                int queue_removed = 0;
1864
1865                /*
1866                 * Need to make sure reassembly_data_length is read before
1867                 * reading reassembly_queue_length and calling
1868                 * _get_first_reassembly. This call is lock free
1869                 * as we never read at the end of the queue which are being
1870                 * updated in SOFTIRQ as more data is received
1871                 */
1872                virt_rmb();
1873                queue_length = info->reassembly_queue_length;
1874                data_read = 0;
1875                to_read = size;
1876                offset = info->first_entry_offset;
1877                while (data_read < size) {
1878                        response = _get_first_reassembly(info);
1879                        data_transfer = smbd_response_payload(response);
1880                        data_length = le32_to_cpu(data_transfer->data_length);
1881                        remaining_data_length =
1882                                le32_to_cpu(
1883                                        data_transfer->remaining_data_length);
1884                        data_offset = le32_to_cpu(data_transfer->data_offset);
1885
1886                        /*
1887                         * The upper layer expects RFC1002 length at the
1888                         * beginning of the payload. Return it to indicate
1889                         * the total length of the packet. This minimize the
1890                         * change to upper layer packet processing logic. This
1891                         * will be eventually remove when an intermediate
1892                         * transport layer is added
1893                         */
1894                        if (response->first_segment && size == 4) {
1895                                unsigned int rfc1002_len =
1896                                        data_length + remaining_data_length;
1897                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1898                                data_read = 4;
1899                                response->first_segment = false;
1900                                log_read(INFO, "returning rfc1002 length %d\n",
1901                                        rfc1002_len);
1902                                goto read_rfc1002_done;
1903                        }
1904
1905                        to_copy = min_t(int, data_length - offset, to_read);
1906                        memcpy(
1907                                buf + data_read,
1908                                (char *)data_transfer + data_offset + offset,
1909                                to_copy);
1910
1911                        /* move on to the next buffer? */
1912                        if (to_copy == data_length - offset) {
1913                                queue_length--;
1914                                /*
1915                                 * No need to lock if we are not at the
1916                                 * end of the queue
1917                                 */
1918                                if (queue_length)
1919                                        list_del(&response->list);
1920                                else {
1921                                        spin_lock_irq(
1922                                                &info->reassembly_queue_lock);
1923                                        list_del(&response->list);
1924                                        spin_unlock_irq(
1925                                                &info->reassembly_queue_lock);
1926                                }
1927                                queue_removed++;
1928                                info->count_reassembly_queue--;
1929                                info->count_dequeue_reassembly_queue++;
1930                                put_receive_buffer(info, response);
1931                                offset = 0;
1932                                log_read(INFO, "put_receive_buffer offset=0\n");
1933                        } else
1934                                offset += to_copy;
1935
1936                        to_read -= to_copy;
1937                        data_read += to_copy;
1938
1939                        log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1940                                "data_transfer_length-offset=%d after that "
1941                                "to_read=%d data_read=%d offset=%d\n",
1942                                to_copy, data_length - offset,
1943                                to_read, data_read, offset);
1944                }
1945
1946                spin_lock_irq(&info->reassembly_queue_lock);
1947                info->reassembly_data_length -= data_read;
1948                info->reassembly_queue_length -= queue_removed;
1949                spin_unlock_irq(&info->reassembly_queue_lock);
1950
1951                info->first_entry_offset = offset;
1952                log_read(INFO, "returning to thread data_read=%d "
1953                        "reassembly_data_length=%d first_entry_offset=%d\n",
1954                        data_read, info->reassembly_data_length,
1955                        info->first_entry_offset);
1956read_rfc1002_done:
1957                return data_read;
1958        }
1959
1960        log_read(INFO, "wait_event on more data\n");
1961        rc = wait_event_interruptible(
1962                info->wait_reassembly_queue,
1963                info->reassembly_data_length >= size ||
1964                        info->transport_status != SMBD_CONNECTED);
1965        /* Don't return any data if interrupted */
1966        if (rc)
1967                return rc;
1968
1969        if (info->transport_status != SMBD_CONNECTED) {
1970                log_read(ERR, "disconnected\n");
1971                return 0;
1972        }
1973
1974        goto again;
1975}
1976
1977/*
1978 * Receive a page from receive reassembly queue
1979 * page: the page to read data into
1980 * to_read: the length of data to read
1981 * return value: actual data read
1982 */
1983static int smbd_recv_page(struct smbd_connection *info,
1984                struct page *page, unsigned int page_offset,
1985                unsigned int to_read)
1986{
1987        int ret;
1988        char *to_address;
1989        void *page_address;
1990
1991        /* make sure we have the page ready for read */
1992        ret = wait_event_interruptible(
1993                info->wait_reassembly_queue,
1994                info->reassembly_data_length >= to_read ||
1995                        info->transport_status != SMBD_CONNECTED);
1996        if (ret)
1997                return ret;
1998
1999        /* now we can read from reassembly queue and not sleep */
2000        page_address = kmap_atomic(page);
2001        to_address = (char *) page_address + page_offset;
2002
2003        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
2004                page, to_address, to_read);
2005
2006        ret = smbd_recv_buf(info, to_address, to_read);
2007        kunmap_atomic(page_address);
2008
2009        return ret;
2010}
2011
2012/*
2013 * Receive data from transport
2014 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
2015 * return: total bytes read, or 0. SMB Direct will not do partial read.
2016 */
2017int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
2018{
2019        char *buf;
2020        struct page *page;
2021        unsigned int to_read, page_offset;
2022        int rc;
2023
2024        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
2025                /* It's a bug in upper layer to get there */
2026                cifs_dbg(VFS, "CIFS: invalid msg iter dir %u\n",
2027                         iov_iter_rw(&msg->msg_iter));
2028                rc = -EINVAL;
2029                goto out;
2030        }
2031
2032        switch (iov_iter_type(&msg->msg_iter)) {
2033        case ITER_KVEC:
2034                buf = msg->msg_iter.kvec->iov_base;
2035                to_read = msg->msg_iter.kvec->iov_len;
2036                rc = smbd_recv_buf(info, buf, to_read);
2037                break;
2038
2039        case ITER_BVEC:
2040                page = msg->msg_iter.bvec->bv_page;
2041                page_offset = msg->msg_iter.bvec->bv_offset;
2042                to_read = msg->msg_iter.bvec->bv_len;
2043                rc = smbd_recv_page(info, page, page_offset, to_read);
2044                break;
2045
2046        default:
2047                /* It's a bug in upper layer to get there */
2048                cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
2049                         iov_iter_type(&msg->msg_iter));
2050                rc = -EINVAL;
2051        }
2052
2053out:
2054        /* SMBDirect will read it all or nothing */
2055        if (rc > 0)
2056                msg->msg_iter.count = 0;
2057        return rc;
2058}
2059
2060/*
2061 * Send data to transport
2062 * Each rqst is transported as a SMBDirect payload
2063 * rqst: the data to write
2064 * return value: 0 if successfully write, otherwise error code
2065 */
2066int smbd_send(struct TCP_Server_Info *server,
2067        int num_rqst, struct smb_rqst *rqst_array)
2068{
2069        struct smbd_connection *info = server->smbd_conn;
2070        struct kvec vec;
2071        int nvecs;
2072        int size;
2073        unsigned int buflen, remaining_data_length;
2074        int start, i, j;
2075        int max_iov_size =
2076                info->max_send_size - sizeof(struct smbd_data_transfer);
2077        struct kvec *iov;
2078        int rc;
2079        struct smb_rqst *rqst;
2080        int rqst_idx;
2081
2082        if (info->transport_status != SMBD_CONNECTED) {
2083                rc = -EAGAIN;
2084                goto done;
2085        }
2086
2087        /*
2088         * Add in the page array if there is one. The caller needs to set
2089         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
2090         * ends at page boundary
2091         */
2092        remaining_data_length = 0;
2093        for (i = 0; i < num_rqst; i++)
2094                remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2095
2096        if (remaining_data_length + sizeof(struct smbd_data_transfer) >
2097                info->max_fragmented_send_size) {
2098                log_write(ERR, "payload size %d > max size %d\n",
2099                        remaining_data_length, info->max_fragmented_send_size);
2100                rc = -EINVAL;
2101                goto done;
2102        }
2103
2104        log_write(INFO, "num_rqst=%d total length=%u\n",
2105                        num_rqst, remaining_data_length);
2106
2107        rqst_idx = 0;
2108next_rqst:
2109        rqst = &rqst_array[rqst_idx];
2110        iov = rqst->rq_iov;
2111
2112        cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2113                rqst_idx, smb_rqst_len(server, rqst));
2114        for (i = 0; i < rqst->rq_nvec; i++)
2115                dump_smb(iov[i].iov_base, iov[i].iov_len);
2116
2117
2118        log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
2119                "rq_tailsz=%d buflen=%lu\n",
2120                rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2121                rqst->rq_tailsz, smb_rqst_len(server, rqst));
2122
2123        start = i = 0;
2124        buflen = 0;
2125        while (true) {
2126                buflen += iov[i].iov_len;
2127                if (buflen > max_iov_size) {
2128                        if (i > start) {
2129                                remaining_data_length -=
2130                                        (buflen-iov[i].iov_len);
2131                                log_write(INFO, "sending iov[] from start=%d "
2132                                        "i=%d nvecs=%d "
2133                                        "remaining_data_length=%d\n",
2134                                        start, i, i-start,
2135                                        remaining_data_length);
2136                                rc = smbd_post_send_data(
2137                                        info, &iov[start], i-start,
2138                                        remaining_data_length);
2139                                if (rc)
2140                                        goto done;
2141                        } else {
2142                                /* iov[start] is too big, break it */
2143                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2144                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
2145                                        " break to %d vectors\n",
2146                                        start, iov[start].iov_base,
2147                                        buflen, nvecs);
2148                                for (j = 0; j < nvecs; j++) {
2149                                        vec.iov_base =
2150                                                (char *)iov[start].iov_base +
2151                                                j*max_iov_size;
2152                                        vec.iov_len = max_iov_size;
2153                                        if (j == nvecs-1)
2154                                                vec.iov_len =
2155                                                        buflen -
2156                                                        max_iov_size*(nvecs-1);
2157                                        remaining_data_length -= vec.iov_len;
2158                                        log_write(INFO,
2159                                                "sending vec j=%d iov_base=%p"
2160                                                " iov_len=%zu "
2161                                                "remaining_data_length=%d\n",
2162                                                j, vec.iov_base, vec.iov_len,
2163                                                remaining_data_length);
2164                                        rc = smbd_post_send_data(
2165                                                info, &vec, 1,
2166                                                remaining_data_length);
2167                                        if (rc)
2168                                                goto done;
2169                                }
2170                                i++;
2171                                if (i == rqst->rq_nvec)
2172                                        break;
2173                        }
2174                        start = i;
2175                        buflen = 0;
2176                } else {
2177                        i++;
2178                        if (i == rqst->rq_nvec) {
2179                                /* send out all remaining vecs */
2180                                remaining_data_length -= buflen;
2181                                log_write(INFO,
2182                                        "sending iov[] from start=%d i=%d "
2183                                        "nvecs=%d remaining_data_length=%d\n",
2184                                        start, i, i-start,
2185                                        remaining_data_length);
2186                                rc = smbd_post_send_data(info, &iov[start],
2187                                        i-start, remaining_data_length);
2188                                if (rc)
2189                                        goto done;
2190                                break;
2191                        }
2192                }
2193                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2194        }
2195
2196        /* now sending pages if there are any */
2197        for (i = 0; i < rqst->rq_npages; i++) {
2198                unsigned int offset;
2199
2200                rqst_page_get_length(rqst, i, &buflen, &offset);
2201                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2202                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2203                        buflen, nvecs);
2204                for (j = 0; j < nvecs; j++) {
2205                        size = max_iov_size;
2206                        if (j == nvecs-1)
2207                                size = buflen - j*max_iov_size;
2208                        remaining_data_length -= size;
2209                        log_write(INFO, "sending pages i=%d offset=%d size=%d"
2210                                " remaining_data_length=%d\n",
2211                                i, j*max_iov_size+offset, size,
2212                                remaining_data_length);
2213                        rc = smbd_post_send_page(
2214                                info, rqst->rq_pages[i],
2215                                j*max_iov_size + offset,
2216                                size, remaining_data_length);
2217                        if (rc)
2218                                goto done;
2219                }
2220        }
2221
2222        rqst_idx++;
2223        if (rqst_idx < num_rqst)
2224                goto next_rqst;
2225
2226done:
2227        /*
2228         * As an optimization, we don't wait for individual I/O to finish
2229         * before sending the next one.
2230         * Send them all and wait for pending send count to get to 0
2231         * that means all the I/Os have been out and we are good to return
2232         */
2233
2234        wait_event(info->wait_send_payload_pending,
2235                atomic_read(&info->send_payload_pending) == 0);
2236
2237        return rc;
2238}
2239
2240static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2241{
2242        struct smbd_mr *mr;
2243        struct ib_cqe *cqe;
2244
2245        if (wc->status) {
2246                log_rdma_mr(ERR, "status=%d\n", wc->status);
2247                cqe = wc->wr_cqe;
2248                mr = container_of(cqe, struct smbd_mr, cqe);
2249                smbd_disconnect_rdma_connection(mr->conn);
2250        }
2251}
2252
2253/*
2254 * The work queue function that recovers MRs
2255 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2256 * again. Both calls are slow, so finish them in a workqueue. This will not
2257 * block I/O path.
2258 * There is one workqueue that recovers MRs, there is no need to lock as the
2259 * I/O requests calling smbd_register_mr will never update the links in the
2260 * mr_list.
2261 */
2262static void smbd_mr_recovery_work(struct work_struct *work)
2263{
2264        struct smbd_connection *info =
2265                container_of(work, struct smbd_connection, mr_recovery_work);
2266        struct smbd_mr *smbdirect_mr;
2267        int rc;
2268
2269        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2270                if (smbdirect_mr->state == MR_INVALIDATED)
2271                        ib_dma_unmap_sg(
2272                                info->id->device, smbdirect_mr->sgl,
2273                                smbdirect_mr->sgl_count,
2274                                smbdirect_mr->dir);
2275                else if (smbdirect_mr->state == MR_ERROR) {
2276
2277                        /* recover this MR entry */
2278                        rc = ib_dereg_mr(smbdirect_mr->mr);
2279                        if (rc) {
2280                                log_rdma_mr(ERR,
2281                                        "ib_dereg_mr failed rc=%x\n",
2282                                        rc);
2283                                smbd_disconnect_rdma_connection(info);
2284                                continue;
2285                        }
2286
2287                        smbdirect_mr->mr = ib_alloc_mr(
2288                                info->pd, info->mr_type,
2289                                info->max_frmr_depth);
2290                        if (IS_ERR(smbdirect_mr->mr)) {
2291                                log_rdma_mr(ERR,
2292                                        "ib_alloc_mr failed mr_type=%x "
2293                                        "max_frmr_depth=%x\n",
2294                                        info->mr_type,
2295                                        info->max_frmr_depth);
2296                                smbd_disconnect_rdma_connection(info);
2297                                continue;
2298                        }
2299                } else
2300                        /* This MR is being used, don't recover it */
2301                        continue;
2302
2303                smbdirect_mr->state = MR_READY;
2304
2305                /* smbdirect_mr->state is updated by this function
2306                 * and is read and updated by I/O issuing CPUs trying
2307                 * to get a MR, the call to atomic_inc_return
2308                 * implicates a memory barrier and guarantees this
2309                 * value is updated before waking up any calls to
2310                 * get_mr() from the I/O issuing CPUs
2311                 */
2312                if (atomic_inc_return(&info->mr_ready_count) == 1)
2313                        wake_up_interruptible(&info->wait_mr);
2314        }
2315}
2316
2317static void destroy_mr_list(struct smbd_connection *info)
2318{
2319        struct smbd_mr *mr, *tmp;
2320
2321        cancel_work_sync(&info->mr_recovery_work);
2322        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2323                if (mr->state == MR_INVALIDATED)
2324                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2325                                mr->sgl_count, mr->dir);
2326                ib_dereg_mr(mr->mr);
2327                kfree(mr->sgl);
2328                kfree(mr);
2329        }
2330}
2331
2332/*
2333 * Allocate MRs used for RDMA read/write
2334 * The number of MRs will not exceed hardware capability in responder_resources
2335 * All MRs are kept in mr_list. The MR can be recovered after it's used
2336 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2337 * as MRs are used and recovered for I/O, but the list links will not change
2338 */
2339static int allocate_mr_list(struct smbd_connection *info)
2340{
2341        int i;
2342        struct smbd_mr *smbdirect_mr, *tmp;
2343
2344        INIT_LIST_HEAD(&info->mr_list);
2345        init_waitqueue_head(&info->wait_mr);
2346        spin_lock_init(&info->mr_list_lock);
2347        atomic_set(&info->mr_ready_count, 0);
2348        atomic_set(&info->mr_used_count, 0);
2349        init_waitqueue_head(&info->wait_for_mr_cleanup);
2350        /* Allocate more MRs (2x) than hardware responder_resources */
2351        for (i = 0; i < info->responder_resources * 2; i++) {
2352                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2353                if (!smbdirect_mr)
2354                        goto out;
2355                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2356                                        info->max_frmr_depth);
2357                if (IS_ERR(smbdirect_mr->mr)) {
2358                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2359                                "max_frmr_depth=%x\n",
2360                                info->mr_type, info->max_frmr_depth);
2361                        goto out;
2362                }
2363                smbdirect_mr->sgl = kcalloc(
2364                                        info->max_frmr_depth,
2365                                        sizeof(struct scatterlist),
2366                                        GFP_KERNEL);
2367                if (!smbdirect_mr->sgl) {
2368                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2369                        ib_dereg_mr(smbdirect_mr->mr);
2370                        goto out;
2371                }
2372                smbdirect_mr->state = MR_READY;
2373                smbdirect_mr->conn = info;
2374
2375                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2376                atomic_inc(&info->mr_ready_count);
2377        }
2378        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2379        return 0;
2380
2381out:
2382        kfree(smbdirect_mr);
2383
2384        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2385                ib_dereg_mr(smbdirect_mr->mr);
2386                kfree(smbdirect_mr->sgl);
2387                kfree(smbdirect_mr);
2388        }
2389        return -ENOMEM;
2390}
2391
2392/*
2393 * Get a MR from mr_list. This function waits until there is at least one
2394 * MR available in the list. It may access the list while the
2395 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2396 * as they never modify the same places. However, there may be several CPUs
2397 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2398 * protect this situation.
2399 */
2400static struct smbd_mr *get_mr(struct smbd_connection *info)
2401{
2402        struct smbd_mr *ret;
2403        int rc;
2404again:
2405        rc = wait_event_interruptible(info->wait_mr,
2406                atomic_read(&info->mr_ready_count) ||
2407                info->transport_status != SMBD_CONNECTED);
2408        if (rc) {
2409                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2410                return NULL;
2411        }
2412
2413        if (info->transport_status != SMBD_CONNECTED) {
2414                log_rdma_mr(ERR, "info->transport_status=%x\n",
2415                        info->transport_status);
2416                return NULL;
2417        }
2418
2419        spin_lock(&info->mr_list_lock);
2420        list_for_each_entry(ret, &info->mr_list, list) {
2421                if (ret->state == MR_READY) {
2422                        ret->state = MR_REGISTERED;
2423                        spin_unlock(&info->mr_list_lock);
2424                        atomic_dec(&info->mr_ready_count);
2425                        atomic_inc(&info->mr_used_count);
2426                        return ret;
2427                }
2428        }
2429
2430        spin_unlock(&info->mr_list_lock);
2431        /*
2432         * It is possible that we could fail to get MR because other processes may
2433         * try to acquire a MR at the same time. If this is the case, retry it.
2434         */
2435        goto again;
2436}
2437
2438/*
2439 * Register memory for RDMA read/write
2440 * pages[]: the list of pages to register memory with
2441 * num_pages: the number of pages to register
2442 * tailsz: if non-zero, the bytes to register in the last page
2443 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2444 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2445 * return value: the MR registered, NULL if failed.
2446 */
2447struct smbd_mr *smbd_register_mr(
2448        struct smbd_connection *info, struct page *pages[], int num_pages,
2449        int offset, int tailsz, bool writing, bool need_invalidate)
2450{
2451        struct smbd_mr *smbdirect_mr;
2452        int rc, i;
2453        enum dma_data_direction dir;
2454        struct ib_reg_wr *reg_wr;
2455
2456        if (num_pages > info->max_frmr_depth) {
2457                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2458                        num_pages, info->max_frmr_depth);
2459                return NULL;
2460        }
2461
2462        smbdirect_mr = get_mr(info);
2463        if (!smbdirect_mr) {
2464                log_rdma_mr(ERR, "get_mr returning NULL\n");
2465                return NULL;
2466        }
2467        smbdirect_mr->need_invalidate = need_invalidate;
2468        smbdirect_mr->sgl_count = num_pages;
2469        sg_init_table(smbdirect_mr->sgl, num_pages);
2470
2471        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2472                        num_pages, offset, tailsz);
2473
2474        if (num_pages == 1) {
2475                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2476                goto skip_multiple_pages;
2477        }
2478
2479        /* We have at least two pages to register */
2480        sg_set_page(
2481                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2482        i = 1;
2483        while (i < num_pages - 1) {
2484                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2485                i++;
2486        }
2487        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2488                tailsz ? tailsz : PAGE_SIZE, 0);
2489
2490skip_multiple_pages:
2491        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2492        smbdirect_mr->dir = dir;
2493        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2494        if (!rc) {
2495                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2496                        num_pages, dir, rc);
2497                goto dma_map_error;
2498        }
2499
2500        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2501                NULL, PAGE_SIZE);
2502        if (rc != num_pages) {
2503                log_rdma_mr(ERR,
2504                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2505                        rc, num_pages);
2506                goto map_mr_error;
2507        }
2508
2509        ib_update_fast_reg_key(smbdirect_mr->mr,
2510                ib_inc_rkey(smbdirect_mr->mr->rkey));
2511        reg_wr = &smbdirect_mr->wr;
2512        reg_wr->wr.opcode = IB_WR_REG_MR;
2513        smbdirect_mr->cqe.done = register_mr_done;
2514        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2515        reg_wr->wr.num_sge = 0;
2516        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2517        reg_wr->mr = smbdirect_mr->mr;
2518        reg_wr->key = smbdirect_mr->mr->rkey;
2519        reg_wr->access = writing ?
2520                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2521                        IB_ACCESS_REMOTE_READ;
2522
2523        /*
2524         * There is no need for waiting for complemtion on ib_post_send
2525         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2526         * on the next ib_post_send when we actaully send I/O to remote peer
2527         */
2528        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2529        if (!rc)
2530                return smbdirect_mr;
2531
2532        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2533                rc, reg_wr->key);
2534
2535        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2536map_mr_error:
2537        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2538                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2539
2540dma_map_error:
2541        smbdirect_mr->state = MR_ERROR;
2542        if (atomic_dec_and_test(&info->mr_used_count))
2543                wake_up(&info->wait_for_mr_cleanup);
2544
2545        smbd_disconnect_rdma_connection(info);
2546
2547        return NULL;
2548}
2549
2550static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2551{
2552        struct smbd_mr *smbdirect_mr;
2553        struct ib_cqe *cqe;
2554
2555        cqe = wc->wr_cqe;
2556        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2557        smbdirect_mr->state = MR_INVALIDATED;
2558        if (wc->status != IB_WC_SUCCESS) {
2559                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2560                smbdirect_mr->state = MR_ERROR;
2561        }
2562        complete(&smbdirect_mr->invalidate_done);
2563}
2564
2565/*
2566 * Deregister a MR after I/O is done
2567 * This function may wait if remote invalidation is not used
2568 * and we have to locally invalidate the buffer to prevent data is being
2569 * modified by remote peer after upper layer consumes it
2570 */
2571int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2572{
2573        struct ib_send_wr *wr;
2574        struct smbd_connection *info = smbdirect_mr->conn;
2575        int rc = 0;
2576
2577        if (smbdirect_mr->need_invalidate) {
2578                /* Need to finish local invalidation before returning */
2579                wr = &smbdirect_mr->inv_wr;
2580                wr->opcode = IB_WR_LOCAL_INV;
2581                smbdirect_mr->cqe.done = local_inv_done;
2582                wr->wr_cqe = &smbdirect_mr->cqe;
2583                wr->num_sge = 0;
2584                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2585                wr->send_flags = IB_SEND_SIGNALED;
2586
2587                init_completion(&smbdirect_mr->invalidate_done);
2588                rc = ib_post_send(info->id->qp, wr, NULL);
2589                if (rc) {
2590                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2591                        smbd_disconnect_rdma_connection(info);
2592                        goto done;
2593                }
2594                wait_for_completion(&smbdirect_mr->invalidate_done);
2595                smbdirect_mr->need_invalidate = false;
2596        } else
2597                /*
2598                 * For remote invalidation, just set it to MR_INVALIDATED
2599                 * and defer to mr_recovery_work to recover the MR for next use
2600                 */
2601                smbdirect_mr->state = MR_INVALIDATED;
2602
2603        /*
2604         * Schedule the work to do MR recovery for future I/Os
2605         * MR recovery is slow and we don't want it to block the current I/O
2606         */
2607        queue_work(info->workqueue, &info->mr_recovery_work);
2608
2609done:
2610        if (atomic_dec_and_test(&info->mr_used_count))
2611                wake_up(&info->wait_for_mr_cleanup);
2612
2613        return rc;
2614}
2615