linux/fs/cifs/smbdirect.c
<<
>>
Prefs
   1/*
   2 *   Copyright (C) 2017, Microsoft Corporation.
   3 *
   4 *   Author(s): Long Li <longli@microsoft.com>
   5 *
   6 *   This program is free software;  you can redistribute it and/or modify
   7 *   it under the terms of the GNU General Public License as published by
   8 *   the Free Software Foundation; either version 2 of the License, or
   9 *   (at your option) any later version.
  10 *
  11 *   This program is distributed in the hope that it will be useful,
  12 *   but WITHOUT ANY WARRANTY;  without even the implied warranty of
  13 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
  14 *   the GNU General Public License for more details.
  15 */
  16#include <linux/module.h>
  17#include <linux/highmem.h>
  18#include "smbdirect.h"
  19#include "cifs_debug.h"
  20#include "cifsproto.h"
  21#include "smb2proto.h"
  22
  23static struct smbd_response *get_empty_queue_buffer(
  24                struct smbd_connection *info);
  25static struct smbd_response *get_receive_buffer(
  26                struct smbd_connection *info);
  27static void put_receive_buffer(
  28                struct smbd_connection *info,
  29                struct smbd_response *response);
  30static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  31static void destroy_receive_buffers(struct smbd_connection *info);
  32
  33static void put_empty_packet(
  34                struct smbd_connection *info, struct smbd_response *response);
  35static void enqueue_reassembly(
  36                struct smbd_connection *info,
  37                struct smbd_response *response, int data_length);
  38static struct smbd_response *_get_first_reassembly(
  39                struct smbd_connection *info);
  40
  41static int smbd_post_recv(
  42                struct smbd_connection *info,
  43                struct smbd_response *response);
  44
  45static int smbd_post_send_empty(struct smbd_connection *info);
  46static int smbd_post_send_data(
  47                struct smbd_connection *info,
  48                struct kvec *iov, int n_vec, int remaining_data_length);
  49static int smbd_post_send_page(struct smbd_connection *info,
  50                struct page *page, unsigned long offset,
  51                size_t size, int remaining_data_length);
  52
  53static void destroy_mr_list(struct smbd_connection *info);
  54static int allocate_mr_list(struct smbd_connection *info);
  55
  56/* SMBD version number */
  57#define SMBD_V1 0x0100
  58
  59/* Port numbers for SMBD transport */
  60#define SMB_PORT        445
  61#define SMBD_PORT       5445
  62
  63/* Address lookup and resolve timeout in ms */
  64#define RDMA_RESOLVE_TIMEOUT    5000
  65
  66/* SMBD negotiation timeout in seconds */
  67#define SMBD_NEGOTIATE_TIMEOUT  120
  68
  69/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  70#define SMBD_MIN_RECEIVE_SIZE           128
  71#define SMBD_MIN_FRAGMENTED_SIZE        131072
  72
  73/*
  74 * Default maximum number of RDMA read/write outstanding on this connection
  75 * This value is possibly decreased during QP creation on hardware limit
  76 */
  77#define SMBD_CM_RESPONDER_RESOURCES     32
  78
  79/* Maximum number of retries on data transfer operations */
  80#define SMBD_CM_RETRY                   6
  81/* No need to retry on Receiver Not Ready since SMBD manages credits */
  82#define SMBD_CM_RNR_RETRY               0
  83
  84/*
  85 * User configurable initial values per SMBD transport connection
  86 * as defined in [MS-SMBD] 3.1.1.1
  87 * Those may change after a SMBD negotiation
  88 */
  89/* The local peer's maximum number of credits to grant to the peer */
  90int smbd_receive_credit_max = 255;
  91
  92/* The remote peer's credit request of local peer */
  93int smbd_send_credit_target = 255;
  94
  95/* The maximum single message size can be sent to remote peer */
  96int smbd_max_send_size = 1364;
  97
  98/*  The maximum fragmented upper-layer payload receive size supported */
  99int smbd_max_fragmented_recv_size = 1024 * 1024;
 100
 101/*  The maximum single-message size which can be received */
 102int smbd_max_receive_size = 8192;
 103
 104/* The timeout to initiate send of a keepalive message on idle */
 105int smbd_keep_alive_interval = 120;
 106
 107/*
 108 * User configurable initial values for RDMA transport
 109 * The actual values used may be lower and are limited to hardware capabilities
 110 */
 111/* Default maximum number of SGEs in a RDMA write/read */
 112int smbd_max_frmr_depth = 2048;
 113
 114/* If payload is less than this byte, use RDMA send/recv not read/write */
 115int rdma_readwrite_threshold = 4096;
 116
 117/* Transport logging functions
 118 * Logging are defined as classes. They can be OR'ed to define the actual
 119 * logging level via module parameter smbd_logging_class
 120 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 121 * log_rdma_event()
 122 */
 123#define LOG_OUTGOING                    0x1
 124#define LOG_INCOMING                    0x2
 125#define LOG_READ                        0x4
 126#define LOG_WRITE                       0x8
 127#define LOG_RDMA_SEND                   0x10
 128#define LOG_RDMA_RECV                   0x20
 129#define LOG_KEEP_ALIVE                  0x40
 130#define LOG_RDMA_EVENT                  0x80
 131#define LOG_RDMA_MR                     0x100
 132static unsigned int smbd_logging_class;
 133module_param(smbd_logging_class, uint, 0644);
 134MODULE_PARM_DESC(smbd_logging_class,
 135        "Logging class for SMBD transport 0x0 to 0x100");
 136
 137#define ERR             0x0
 138#define INFO            0x1
 139static unsigned int smbd_logging_level = ERR;
 140module_param(smbd_logging_level, uint, 0644);
 141MODULE_PARM_DESC(smbd_logging_level,
 142        "Logging level for SMBD transport, 0 (default): error, 1: info");
 143
 144#define log_rdma(level, class, fmt, args...)                            \
 145do {                                                                    \
 146        if (level <= smbd_logging_level || class & smbd_logging_class)  \
 147                cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 148} while (0)
 149
 150#define log_outgoing(level, fmt, args...) \
 151                log_rdma(level, LOG_OUTGOING, fmt, ##args)
 152#define log_incoming(level, fmt, args...) \
 153                log_rdma(level, LOG_INCOMING, fmt, ##args)
 154#define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
 155#define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
 156#define log_rdma_send(level, fmt, args...) \
 157                log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 158#define log_rdma_recv(level, fmt, args...) \
 159                log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 160#define log_keep_alive(level, fmt, args...) \
 161                log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 162#define log_rdma_event(level, fmt, args...) \
 163                log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 164#define log_rdma_mr(level, fmt, args...) \
 165                log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 166
 167/*
 168 * Destroy the transport and related RDMA and memory resources
 169 * Need to go through all the pending counters and make sure on one is using
 170 * the transport while it is destroyed
 171 */
 172static void smbd_destroy_rdma_work(struct work_struct *work)
 173{
 174        struct smbd_response *response;
 175        struct smbd_connection *info =
 176                container_of(work, struct smbd_connection, destroy_work);
 177        unsigned long flags;
 178
 179        log_rdma_event(INFO, "destroying qp\n");
 180        ib_drain_qp(info->id->qp);
 181        rdma_destroy_qp(info->id);
 182
 183        /* Unblock all I/O waiting on the send queue */
 184        wake_up_interruptible_all(&info->wait_send_queue);
 185
 186        log_rdma_event(INFO, "cancelling idle timer\n");
 187        cancel_delayed_work_sync(&info->idle_timer_work);
 188        log_rdma_event(INFO, "cancelling send immediate work\n");
 189        cancel_delayed_work_sync(&info->send_immediate_work);
 190
 191        log_rdma_event(INFO, "wait for all send to finish\n");
 192        wait_event(info->wait_smbd_send_pending,
 193                info->smbd_send_pending == 0);
 194
 195        log_rdma_event(INFO, "wait for all recv to finish\n");
 196        wake_up_interruptible(&info->wait_reassembly_queue);
 197        wait_event(info->wait_smbd_recv_pending,
 198                info->smbd_recv_pending == 0);
 199
 200        log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
 201        wait_event(info->wait_send_pending,
 202                atomic_read(&info->send_pending) == 0);
 203        wait_event(info->wait_send_payload_pending,
 204                atomic_read(&info->send_payload_pending) == 0);
 205
 206        log_rdma_event(INFO, "freeing mr list\n");
 207        wake_up_interruptible_all(&info->wait_mr);
 208        wait_event(info->wait_for_mr_cleanup,
 209                atomic_read(&info->mr_used_count) == 0);
 210        destroy_mr_list(info);
 211
 212        /* It's not posssible for upper layer to get to reassembly */
 213        log_rdma_event(INFO, "drain the reassembly queue\n");
 214        do {
 215                spin_lock_irqsave(&info->reassembly_queue_lock, flags);
 216                response = _get_first_reassembly(info);
 217                if (response) {
 218                        list_del(&response->list);
 219                        spin_unlock_irqrestore(
 220                                &info->reassembly_queue_lock, flags);
 221                        put_receive_buffer(info, response);
 222                } else
 223                        spin_unlock_irqrestore(&info->reassembly_queue_lock, flags);
 224        } while (response);
 225
 226        info->reassembly_data_length = 0;
 227
 228        log_rdma_event(INFO, "free receive buffers\n");
 229        wait_event(info->wait_receive_queues,
 230                info->count_receive_queue + info->count_empty_packet_queue
 231                        == info->receive_credit_max);
 232        destroy_receive_buffers(info);
 233
 234        ib_free_cq(info->send_cq);
 235        ib_free_cq(info->recv_cq);
 236        ib_dealloc_pd(info->pd);
 237        rdma_destroy_id(info->id);
 238
 239        /* free mempools */
 240        mempool_destroy(info->request_mempool);
 241        kmem_cache_destroy(info->request_cache);
 242
 243        mempool_destroy(info->response_mempool);
 244        kmem_cache_destroy(info->response_cache);
 245
 246        info->transport_status = SMBD_DESTROYED;
 247        wake_up_all(&info->wait_destroy);
 248}
 249
 250static int smbd_process_disconnected(struct smbd_connection *info)
 251{
 252        schedule_work(&info->destroy_work);
 253        return 0;
 254}
 255
 256static void smbd_disconnect_rdma_work(struct work_struct *work)
 257{
 258        struct smbd_connection *info =
 259                container_of(work, struct smbd_connection, disconnect_work);
 260
 261        if (info->transport_status == SMBD_CONNECTED) {
 262                info->transport_status = SMBD_DISCONNECTING;
 263                rdma_disconnect(info->id);
 264        }
 265}
 266
 267static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 268{
 269        queue_work(info->workqueue, &info->disconnect_work);
 270}
 271
 272/* Upcall from RDMA CM */
 273static int smbd_conn_upcall(
 274                struct rdma_cm_id *id, struct rdma_cm_event *event)
 275{
 276        struct smbd_connection *info = id->context;
 277
 278        log_rdma_event(INFO, "event=%d status=%d\n",
 279                event->event, event->status);
 280
 281        switch (event->event) {
 282        case RDMA_CM_EVENT_ADDR_RESOLVED:
 283        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 284                info->ri_rc = 0;
 285                complete(&info->ri_done);
 286                break;
 287
 288        case RDMA_CM_EVENT_ADDR_ERROR:
 289                info->ri_rc = -EHOSTUNREACH;
 290                complete(&info->ri_done);
 291                break;
 292
 293        case RDMA_CM_EVENT_ROUTE_ERROR:
 294                info->ri_rc = -ENETUNREACH;
 295                complete(&info->ri_done);
 296                break;
 297
 298        case RDMA_CM_EVENT_ESTABLISHED:
 299                log_rdma_event(INFO, "connected event=%d\n", event->event);
 300                info->transport_status = SMBD_CONNECTED;
 301                wake_up_interruptible(&info->conn_wait);
 302                break;
 303
 304        case RDMA_CM_EVENT_CONNECT_ERROR:
 305        case RDMA_CM_EVENT_UNREACHABLE:
 306        case RDMA_CM_EVENT_REJECTED:
 307                log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 308                info->transport_status = SMBD_DISCONNECTED;
 309                wake_up_interruptible(&info->conn_wait);
 310                break;
 311
 312        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 313        case RDMA_CM_EVENT_DISCONNECTED:
 314                /* This happenes when we fail the negotiation */
 315                if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 316                        info->transport_status = SMBD_DISCONNECTED;
 317                        wake_up(&info->conn_wait);
 318                        break;
 319                }
 320
 321                info->transport_status = SMBD_DISCONNECTED;
 322                smbd_process_disconnected(info);
 323                break;
 324
 325        default:
 326                break;
 327        }
 328
 329        return 0;
 330}
 331
 332/* Upcall from RDMA QP */
 333static void
 334smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 335{
 336        struct smbd_connection *info = context;
 337
 338        log_rdma_event(ERR, "%s on device %s info %p\n",
 339                ib_event_msg(event->event), event->device->name, info);
 340
 341        switch (event->event) {
 342        case IB_EVENT_CQ_ERR:
 343        case IB_EVENT_QP_FATAL:
 344                smbd_disconnect_rdma_connection(info);
 345
 346        default:
 347                break;
 348        }
 349}
 350
 351static inline void *smbd_request_payload(struct smbd_request *request)
 352{
 353        return (void *)request->packet;
 354}
 355
 356static inline void *smbd_response_payload(struct smbd_response *response)
 357{
 358        return (void *)response->packet;
 359}
 360
 361/* Called when a RDMA send is done */
 362static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 363{
 364        int i;
 365        struct smbd_request *request =
 366                container_of(wc->wr_cqe, struct smbd_request, cqe);
 367
 368        log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 369                request, wc->status);
 370
 371        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 372                log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 373                        wc->status, wc->opcode);
 374                smbd_disconnect_rdma_connection(request->info);
 375        }
 376
 377        for (i = 0; i < request->num_sge; i++)
 378                ib_dma_unmap_single(request->info->id->device,
 379                        request->sge[i].addr,
 380                        request->sge[i].length,
 381                        DMA_TO_DEVICE);
 382
 383        if (request->has_payload) {
 384                if (atomic_dec_and_test(&request->info->send_payload_pending))
 385                        wake_up(&request->info->wait_send_payload_pending);
 386        } else {
 387                if (atomic_dec_and_test(&request->info->send_pending))
 388                        wake_up(&request->info->wait_send_pending);
 389        }
 390
 391        mempool_free(request, request->info->request_mempool);
 392}
 393
 394static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 395{
 396        log_rdma_event(INFO, "resp message min_version %u max_version %u "
 397                "negotiated_version %u credits_requested %u "
 398                "credits_granted %u status %u max_readwrite_size %u "
 399                "preferred_send_size %u max_receive_size %u "
 400                "max_fragmented_size %u\n",
 401                resp->min_version, resp->max_version, resp->negotiated_version,
 402                resp->credits_requested, resp->credits_granted, resp->status,
 403                resp->max_readwrite_size, resp->preferred_send_size,
 404                resp->max_receive_size, resp->max_fragmented_size);
 405}
 406
 407/*
 408 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 409 * response, packet_length: the negotiation response message
 410 * return value: true if negotiation is a success, false if failed
 411 */
 412static bool process_negotiation_response(
 413                struct smbd_response *response, int packet_length)
 414{
 415        struct smbd_connection *info = response->info;
 416        struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 417
 418        if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 419                log_rdma_event(ERR,
 420                        "error: packet_length=%d\n", packet_length);
 421                return false;
 422        }
 423
 424        if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 425                log_rdma_event(ERR, "error: negotiated_version=%x\n",
 426                        le16_to_cpu(packet->negotiated_version));
 427                return false;
 428        }
 429        info->protocol = le16_to_cpu(packet->negotiated_version);
 430
 431        if (packet->credits_requested == 0) {
 432                log_rdma_event(ERR, "error: credits_requested==0\n");
 433                return false;
 434        }
 435        info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 436
 437        if (packet->credits_granted == 0) {
 438                log_rdma_event(ERR, "error: credits_granted==0\n");
 439                return false;
 440        }
 441        atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 442
 443        atomic_set(&info->receive_credits, 0);
 444
 445        if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 446                log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 447                        le32_to_cpu(packet->preferred_send_size));
 448                return false;
 449        }
 450        info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 451
 452        if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 453                log_rdma_event(ERR, "error: max_receive_size=%d\n",
 454                        le32_to_cpu(packet->max_receive_size));
 455                return false;
 456        }
 457        info->max_send_size = min_t(int, info->max_send_size,
 458                                        le32_to_cpu(packet->max_receive_size));
 459
 460        if (le32_to_cpu(packet->max_fragmented_size) <
 461                        SMBD_MIN_FRAGMENTED_SIZE) {
 462                log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 463                        le32_to_cpu(packet->max_fragmented_size));
 464                return false;
 465        }
 466        info->max_fragmented_send_size =
 467                le32_to_cpu(packet->max_fragmented_size);
 468        info->rdma_readwrite_threshold =
 469                rdma_readwrite_threshold > info->max_fragmented_send_size ?
 470                info->max_fragmented_send_size :
 471                rdma_readwrite_threshold;
 472
 473
 474        info->max_readwrite_size = min_t(u32,
 475                        le32_to_cpu(packet->max_readwrite_size),
 476                        info->max_frmr_depth * PAGE_SIZE);
 477        info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 478
 479        return true;
 480}
 481
 482/*
 483 * Check and schedule to send an immediate packet
 484 * This is used to extend credtis to remote peer to keep the transport busy
 485 */
 486static void check_and_send_immediate(struct smbd_connection *info)
 487{
 488        if (info->transport_status != SMBD_CONNECTED)
 489                return;
 490
 491        info->send_immediate = true;
 492
 493        /*
 494         * Promptly send a packet if our peer is running low on receive
 495         * credits
 496         */
 497        if (atomic_read(&info->receive_credits) <
 498                info->receive_credit_target - 1)
 499                queue_delayed_work(
 500                        info->workqueue, &info->send_immediate_work, 0);
 501}
 502
 503static void smbd_post_send_credits(struct work_struct *work)
 504{
 505        int ret = 0;
 506        int use_receive_queue = 1;
 507        int rc;
 508        struct smbd_response *response;
 509        struct smbd_connection *info =
 510                container_of(work, struct smbd_connection,
 511                        post_send_credits_work);
 512
 513        if (info->transport_status != SMBD_CONNECTED) {
 514                wake_up(&info->wait_receive_queues);
 515                return;
 516        }
 517
 518        if (info->receive_credit_target >
 519                atomic_read(&info->receive_credits)) {
 520                while (true) {
 521                        if (use_receive_queue)
 522                                response = get_receive_buffer(info);
 523                        else
 524                                response = get_empty_queue_buffer(info);
 525                        if (!response) {
 526                                /* now switch to emtpy packet queue */
 527                                if (use_receive_queue) {
 528                                        use_receive_queue = 0;
 529                                        continue;
 530                                } else
 531                                        break;
 532                        }
 533
 534                        response->type = SMBD_TRANSFER_DATA;
 535                        response->first_segment = false;
 536                        rc = smbd_post_recv(info, response);
 537                        if (rc) {
 538                                log_rdma_recv(ERR,
 539                                        "post_recv failed rc=%d\n", rc);
 540                                put_receive_buffer(info, response);
 541                                break;
 542                        }
 543
 544                        ret++;
 545                }
 546        }
 547
 548        spin_lock(&info->lock_new_credits_offered);
 549        info->new_credits_offered += ret;
 550        spin_unlock(&info->lock_new_credits_offered);
 551
 552        atomic_add(ret, &info->receive_credits);
 553
 554        /* Check if we can post new receive and grant credits to peer */
 555        check_and_send_immediate(info);
 556}
 557
 558static void smbd_recv_done_work(struct work_struct *work)
 559{
 560        struct smbd_connection *info =
 561                container_of(work, struct smbd_connection, recv_done_work);
 562
 563        /*
 564         * We may have new send credits granted from remote peer
 565         * If any sender is blcoked on lack of credets, unblock it
 566         */
 567        if (atomic_read(&info->send_credits))
 568                wake_up_interruptible(&info->wait_send_queue);
 569
 570        /*
 571         * Check if we need to send something to remote peer to
 572         * grant more credits or respond to KEEP_ALIVE packet
 573         */
 574        check_and_send_immediate(info);
 575}
 576
 577/* Called from softirq, when recv is done */
 578static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 579{
 580        struct smbd_data_transfer *data_transfer;
 581        struct smbd_response *response =
 582                container_of(wc->wr_cqe, struct smbd_response, cqe);
 583        struct smbd_connection *info = response->info;
 584        int data_length = 0;
 585
 586        log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d "
 587                      "byte_len=%d pkey_index=%x\n",
 588                response, response->type, wc->status, wc->opcode,
 589                wc->byte_len, wc->pkey_index);
 590
 591        if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 592                log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 593                        wc->status, wc->opcode);
 594                smbd_disconnect_rdma_connection(info);
 595                goto error;
 596        }
 597
 598        ib_dma_sync_single_for_cpu(
 599                wc->qp->device,
 600                response->sge.addr,
 601                response->sge.length,
 602                DMA_FROM_DEVICE);
 603
 604        switch (response->type) {
 605        /* SMBD negotiation response */
 606        case SMBD_NEGOTIATE_RESP:
 607                dump_smbd_negotiate_resp(smbd_response_payload(response));
 608                info->full_packet_received = true;
 609                info->negotiate_done =
 610                        process_negotiation_response(response, wc->byte_len);
 611                complete(&info->negotiate_completion);
 612                break;
 613
 614        /* SMBD data transfer packet */
 615        case SMBD_TRANSFER_DATA:
 616                data_transfer = smbd_response_payload(response);
 617                data_length = le32_to_cpu(data_transfer->data_length);
 618
 619                /*
 620                 * If this is a packet with data playload place the data in
 621                 * reassembly queue and wake up the reading thread
 622                 */
 623                if (data_length) {
 624                        if (info->full_packet_received)
 625                                response->first_segment = true;
 626
 627                        if (le32_to_cpu(data_transfer->remaining_data_length))
 628                                info->full_packet_received = false;
 629                        else
 630                                info->full_packet_received = true;
 631
 632                        enqueue_reassembly(
 633                                info,
 634                                response,
 635                                data_length);
 636                } else
 637                        put_empty_packet(info, response);
 638
 639                if (data_length)
 640                        wake_up_interruptible(&info->wait_reassembly_queue);
 641
 642                atomic_dec(&info->receive_credits);
 643                info->receive_credit_target =
 644                        le16_to_cpu(data_transfer->credits_requested);
 645                atomic_add(le16_to_cpu(data_transfer->credits_granted),
 646                        &info->send_credits);
 647
 648                log_incoming(INFO, "data flags %d data_offset %d "
 649                        "data_length %d remaining_data_length %d\n",
 650                        le16_to_cpu(data_transfer->flags),
 651                        le32_to_cpu(data_transfer->data_offset),
 652                        le32_to_cpu(data_transfer->data_length),
 653                        le32_to_cpu(data_transfer->remaining_data_length));
 654
 655                /* Send a KEEP_ALIVE response right away if requested */
 656                info->keep_alive_requested = KEEP_ALIVE_NONE;
 657                if (le16_to_cpu(data_transfer->flags) &
 658                                SMB_DIRECT_RESPONSE_REQUESTED) {
 659                        info->keep_alive_requested = KEEP_ALIVE_PENDING;
 660                }
 661
 662                queue_work(info->workqueue, &info->recv_done_work);
 663                return;
 664
 665        default:
 666                log_rdma_recv(ERR,
 667                        "unexpected response type=%d\n", response->type);
 668        }
 669
 670error:
 671        put_receive_buffer(info, response);
 672}
 673
 674static struct rdma_cm_id *smbd_create_id(
 675                struct smbd_connection *info,
 676                struct sockaddr *dstaddr, int port)
 677{
 678        struct rdma_cm_id *id;
 679        int rc;
 680        __be16 *sport;
 681
 682        id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 683                RDMA_PS_TCP, IB_QPT_RC);
 684        if (IS_ERR(id)) {
 685                rc = PTR_ERR(id);
 686                log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 687                return id;
 688        }
 689
 690        if (dstaddr->sa_family == AF_INET6)
 691                sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 692        else
 693                sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 694
 695        *sport = htons(port);
 696
 697        init_completion(&info->ri_done);
 698        info->ri_rc = -ETIMEDOUT;
 699
 700        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 701                RDMA_RESOLVE_TIMEOUT);
 702        if (rc) {
 703                log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 704                goto out;
 705        }
 706        wait_for_completion_interruptible_timeout(
 707                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 708        rc = info->ri_rc;
 709        if (rc) {
 710                log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 711                goto out;
 712        }
 713
 714        info->ri_rc = -ETIMEDOUT;
 715        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 716        if (rc) {
 717                log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 718                goto out;
 719        }
 720        wait_for_completion_interruptible_timeout(
 721                &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 722        rc = info->ri_rc;
 723        if (rc) {
 724                log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 725                goto out;
 726        }
 727
 728        return id;
 729
 730out:
 731        rdma_destroy_id(id);
 732        return ERR_PTR(rc);
 733}
 734
 735/*
 736 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 737 * This implementation requries FRWR on RDMA read/write
 738 * return value: true if it is supported
 739 */
 740static bool frwr_is_supported(struct ib_device_attr *attrs)
 741{
 742        if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 743                return false;
 744        if (attrs->max_fast_reg_page_list_len == 0)
 745                return false;
 746        return true;
 747}
 748
 749static int smbd_ia_open(
 750                struct smbd_connection *info,
 751                struct sockaddr *dstaddr, int port)
 752{
 753        int rc;
 754
 755        info->id = smbd_create_id(info, dstaddr, port);
 756        if (IS_ERR(info->id)) {
 757                rc = PTR_ERR(info->id);
 758                goto out1;
 759        }
 760
 761        if (!frwr_is_supported(&info->id->device->attrs)) {
 762                log_rdma_event(ERR,
 763                        "Fast Registration Work Requests "
 764                        "(FRWR) is not supported\n");
 765                log_rdma_event(ERR,
 766                        "Device capability flags = %llx "
 767                        "max_fast_reg_page_list_len = %u\n",
 768                        info->id->device->attrs.device_cap_flags,
 769                        info->id->device->attrs.max_fast_reg_page_list_len);
 770                rc = -EPROTONOSUPPORT;
 771                goto out2;
 772        }
 773        info->max_frmr_depth = min_t(int,
 774                smbd_max_frmr_depth,
 775                info->id->device->attrs.max_fast_reg_page_list_len);
 776        info->mr_type = IB_MR_TYPE_MEM_REG;
 777        if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 778                info->mr_type = IB_MR_TYPE_SG_GAPS;
 779
 780        info->pd = ib_alloc_pd(info->id->device, 0);
 781        if (IS_ERR(info->pd)) {
 782                rc = PTR_ERR(info->pd);
 783                log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 784                goto out2;
 785        }
 786
 787        return 0;
 788
 789out2:
 790        rdma_destroy_id(info->id);
 791        info->id = NULL;
 792
 793out1:
 794        return rc;
 795}
 796
 797/*
 798 * Send a negotiation request message to the peer
 799 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 800 * After negotiation, the transport is connected and ready for
 801 * carrying upper layer SMB payload
 802 */
 803static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 804{
 805        struct ib_send_wr send_wr;
 806        int rc = -ENOMEM;
 807        struct smbd_request *request;
 808        struct smbd_negotiate_req *packet;
 809
 810        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 811        if (!request)
 812                return rc;
 813
 814        request->info = info;
 815
 816        packet = smbd_request_payload(request);
 817        packet->min_version = cpu_to_le16(SMBD_V1);
 818        packet->max_version = cpu_to_le16(SMBD_V1);
 819        packet->reserved = 0;
 820        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 821        packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 822        packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 823        packet->max_fragmented_size =
 824                cpu_to_le32(info->max_fragmented_recv_size);
 825
 826        request->num_sge = 1;
 827        request->sge[0].addr = ib_dma_map_single(
 828                                info->id->device, (void *)packet,
 829                                sizeof(*packet), DMA_TO_DEVICE);
 830        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 831                rc = -EIO;
 832                goto dma_mapping_failed;
 833        }
 834
 835        request->sge[0].length = sizeof(*packet);
 836        request->sge[0].lkey = info->pd->local_dma_lkey;
 837
 838        ib_dma_sync_single_for_device(
 839                info->id->device, request->sge[0].addr,
 840                request->sge[0].length, DMA_TO_DEVICE);
 841
 842        request->cqe.done = send_done;
 843
 844        send_wr.next = NULL;
 845        send_wr.wr_cqe = &request->cqe;
 846        send_wr.sg_list = request->sge;
 847        send_wr.num_sge = request->num_sge;
 848        send_wr.opcode = IB_WR_SEND;
 849        send_wr.send_flags = IB_SEND_SIGNALED;
 850
 851        log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 852                request->sge[0].addr,
 853                request->sge[0].length, request->sge[0].lkey);
 854
 855        request->has_payload = false;
 856        atomic_inc(&info->send_pending);
 857        rc = ib_post_send(info->id->qp, &send_wr, NULL);
 858        if (!rc)
 859                return 0;
 860
 861        /* if we reach here, post send failed */
 862        log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 863        atomic_dec(&info->send_pending);
 864        ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 865                request->sge[0].length, DMA_TO_DEVICE);
 866
 867        smbd_disconnect_rdma_connection(info);
 868
 869dma_mapping_failed:
 870        mempool_free(request, info->request_mempool);
 871        return rc;
 872}
 873
 874/*
 875 * Extend the credits to remote peer
 876 * This implements [MS-SMBD] 3.1.5.9
 877 * The idea is that we should extend credits to remote peer as quickly as
 878 * it's allowed, to maintain data flow. We allocate as much receive
 879 * buffer as possible, and extend the receive credits to remote peer
 880 * return value: the new credtis being granted.
 881 */
 882static int manage_credits_prior_sending(struct smbd_connection *info)
 883{
 884        int new_credits;
 885
 886        spin_lock(&info->lock_new_credits_offered);
 887        new_credits = info->new_credits_offered;
 888        info->new_credits_offered = 0;
 889        spin_unlock(&info->lock_new_credits_offered);
 890
 891        return new_credits;
 892}
 893
 894/*
 895 * Check if we need to send a KEEP_ALIVE message
 896 * The idle connection timer triggers a KEEP_ALIVE message when expires
 897 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 898 * back a response.
 899 * return value:
 900 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 901 * 0: otherwise
 902 */
 903static int manage_keep_alive_before_sending(struct smbd_connection *info)
 904{
 905        if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 906                info->keep_alive_requested = KEEP_ALIVE_SENT;
 907                return 1;
 908        }
 909        return 0;
 910}
 911
 912/*
 913 * Build and prepare the SMBD packet header
 914 * This function waits for avaialbe send credits and build a SMBD packet
 915 * header. The caller then optional append payload to the packet after
 916 * the header
 917 * intput values
 918 * size: the size of the payload
 919 * remaining_data_length: remaining data to send if this is part of a
 920 * fragmented packet
 921 * output values
 922 * request_out: the request allocated from this function
 923 * return values: 0 on success, otherwise actual error code returned
 924 */
 925static int smbd_create_header(struct smbd_connection *info,
 926                int size, int remaining_data_length,
 927                struct smbd_request **request_out)
 928{
 929        struct smbd_request *request;
 930        struct smbd_data_transfer *packet;
 931        int header_length;
 932        int rc;
 933
 934        /* Wait for send credits. A SMBD packet needs one credit */
 935        rc = wait_event_interruptible(info->wait_send_queue,
 936                atomic_read(&info->send_credits) > 0 ||
 937                info->transport_status != SMBD_CONNECTED);
 938        if (rc)
 939                return rc;
 940
 941        if (info->transport_status != SMBD_CONNECTED) {
 942                log_outgoing(ERR, "disconnected not sending\n");
 943                return -ENOENT;
 944        }
 945        atomic_dec(&info->send_credits);
 946
 947        request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 948        if (!request) {
 949                rc = -ENOMEM;
 950                goto err;
 951        }
 952
 953        request->info = info;
 954
 955        /* Fill in the packet header */
 956        packet = smbd_request_payload(request);
 957        packet->credits_requested = cpu_to_le16(info->send_credit_target);
 958        packet->credits_granted =
 959                cpu_to_le16(manage_credits_prior_sending(info));
 960        info->send_immediate = false;
 961
 962        packet->flags = 0;
 963        if (manage_keep_alive_before_sending(info))
 964                packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 965
 966        packet->reserved = 0;
 967        if (!size)
 968                packet->data_offset = 0;
 969        else
 970                packet->data_offset = cpu_to_le32(24);
 971        packet->data_length = cpu_to_le32(size);
 972        packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 973        packet->padding = 0;
 974
 975        log_outgoing(INFO, "credits_requested=%d credits_granted=%d "
 976                "data_offset=%d data_length=%d remaining_data_length=%d\n",
 977                le16_to_cpu(packet->credits_requested),
 978                le16_to_cpu(packet->credits_granted),
 979                le32_to_cpu(packet->data_offset),
 980                le32_to_cpu(packet->data_length),
 981                le32_to_cpu(packet->remaining_data_length));
 982
 983        /* Map the packet to DMA */
 984        header_length = sizeof(struct smbd_data_transfer);
 985        /* If this is a packet without payload, don't send padding */
 986        if (!size)
 987                header_length = offsetof(struct smbd_data_transfer, padding);
 988
 989        request->num_sge = 1;
 990        request->sge[0].addr = ib_dma_map_single(info->id->device,
 991                                                 (void *)packet,
 992                                                 header_length,
 993                                                 DMA_BIDIRECTIONAL);
 994        if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 995                mempool_free(request, info->request_mempool);
 996                rc = -EIO;
 997                goto err;
 998        }
 999
1000        request->sge[0].length = header_length;
1001        request->sge[0].lkey = info->pd->local_dma_lkey;
1002
1003        *request_out = request;
1004        return 0;
1005
1006err:
1007        atomic_inc(&info->send_credits);
1008        return rc;
1009}
1010
1011static void smbd_destroy_header(struct smbd_connection *info,
1012                struct smbd_request *request)
1013{
1014
1015        ib_dma_unmap_single(info->id->device,
1016                            request->sge[0].addr,
1017                            request->sge[0].length,
1018                            DMA_TO_DEVICE);
1019        mempool_free(request, info->request_mempool);
1020        atomic_inc(&info->send_credits);
1021}
1022
1023/* Post the send request */
1024static int smbd_post_send(struct smbd_connection *info,
1025                struct smbd_request *request, bool has_payload)
1026{
1027        struct ib_send_wr send_wr;
1028        int rc, i;
1029
1030        for (i = 0; i < request->num_sge; i++) {
1031                log_rdma_send(INFO,
1032                        "rdma_request sge[%d] addr=%llu length=%u\n",
1033                        i, request->sge[i].addr, request->sge[i].length);
1034                ib_dma_sync_single_for_device(
1035                        info->id->device,
1036                        request->sge[i].addr,
1037                        request->sge[i].length,
1038                        DMA_TO_DEVICE);
1039        }
1040
1041        request->cqe.done = send_done;
1042
1043        send_wr.next = NULL;
1044        send_wr.wr_cqe = &request->cqe;
1045        send_wr.sg_list = request->sge;
1046        send_wr.num_sge = request->num_sge;
1047        send_wr.opcode = IB_WR_SEND;
1048        send_wr.send_flags = IB_SEND_SIGNALED;
1049
1050        if (has_payload) {
1051                request->has_payload = true;
1052                atomic_inc(&info->send_payload_pending);
1053        } else {
1054                request->has_payload = false;
1055                atomic_inc(&info->send_pending);
1056        }
1057
1058        rc = ib_post_send(info->id->qp, &send_wr, NULL);
1059        if (rc) {
1060                log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
1061                if (has_payload) {
1062                        if (atomic_dec_and_test(&info->send_payload_pending))
1063                                wake_up(&info->wait_send_payload_pending);
1064                } else {
1065                        if (atomic_dec_and_test(&info->send_pending))
1066                                wake_up(&info->wait_send_pending);
1067                }
1068                smbd_disconnect_rdma_connection(info);
1069        } else
1070                /* Reset timer for idle connection after packet is sent */
1071                mod_delayed_work(info->workqueue, &info->idle_timer_work,
1072                        info->keep_alive_interval*HZ);
1073
1074        return rc;
1075}
1076
1077static int smbd_post_send_sgl(struct smbd_connection *info,
1078        struct scatterlist *sgl, int data_length, int remaining_data_length)
1079{
1080        int num_sgs;
1081        int i, rc;
1082        struct smbd_request *request;
1083        struct scatterlist *sg;
1084
1085        rc = smbd_create_header(
1086                info, data_length, remaining_data_length, &request);
1087        if (rc)
1088                return rc;
1089
1090        num_sgs = sgl ? sg_nents(sgl) : 0;
1091        for_each_sg(sgl, sg, num_sgs, i) {
1092                request->sge[i+1].addr =
1093                        ib_dma_map_page(info->id->device, sg_page(sg),
1094                               sg->offset, sg->length, DMA_BIDIRECTIONAL);
1095                if (ib_dma_mapping_error(
1096                                info->id->device, request->sge[i+1].addr)) {
1097                        rc = -EIO;
1098                        request->sge[i+1].addr = 0;
1099                        goto dma_mapping_failure;
1100                }
1101                request->sge[i+1].length = sg->length;
1102                request->sge[i+1].lkey = info->pd->local_dma_lkey;
1103                request->num_sge++;
1104        }
1105
1106        rc = smbd_post_send(info, request, data_length);
1107        if (!rc)
1108                return 0;
1109
1110dma_mapping_failure:
1111        for (i = 1; i < request->num_sge; i++)
1112                if (request->sge[i].addr)
1113                        ib_dma_unmap_single(info->id->device,
1114                                            request->sge[i].addr,
1115                                            request->sge[i].length,
1116                                            DMA_TO_DEVICE);
1117        smbd_destroy_header(info, request);
1118        return rc;
1119}
1120
1121/*
1122 * Send a page
1123 * page: the page to send
1124 * offset: offset in the page to send
1125 * size: length in the page to send
1126 * remaining_data_length: remaining data to send in this payload
1127 */
1128static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
1129                unsigned long offset, size_t size, int remaining_data_length)
1130{
1131        struct scatterlist sgl;
1132
1133        sg_init_table(&sgl, 1);
1134        sg_set_page(&sgl, page, size, offset);
1135
1136        return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
1137}
1138
1139/*
1140 * Send an empty message
1141 * Empty message is used to extend credits to peer to for keep live
1142 * while there is no upper layer payload to send at the time
1143 */
1144static int smbd_post_send_empty(struct smbd_connection *info)
1145{
1146        info->count_send_empty++;
1147        return smbd_post_send_sgl(info, NULL, 0, 0);
1148}
1149
1150/*
1151 * Send a data buffer
1152 * iov: the iov array describing the data buffers
1153 * n_vec: number of iov array
1154 * remaining_data_length: remaining data to send following this packet
1155 * in segmented SMBD packet
1156 */
1157static int smbd_post_send_data(
1158        struct smbd_connection *info, struct kvec *iov, int n_vec,
1159        int remaining_data_length)
1160{
1161        int i;
1162        u32 data_length = 0;
1163        struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1164
1165        if (n_vec > SMBDIRECT_MAX_SGE) {
1166                cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1167                return -ENOMEM;
1168        }
1169
1170        sg_init_table(sgl, n_vec);
1171        for (i = 0; i < n_vec; i++) {
1172                data_length += iov[i].iov_len;
1173                sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1174        }
1175
1176        return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1177}
1178
1179/*
1180 * Post a receive request to the transport
1181 * The remote peer can only send data when a receive request is posted
1182 * The interaction is controlled by send/receive credit system
1183 */
1184static int smbd_post_recv(
1185                struct smbd_connection *info, struct smbd_response *response)
1186{
1187        struct ib_recv_wr recv_wr;
1188        int rc = -EIO;
1189
1190        response->sge.addr = ib_dma_map_single(
1191                                info->id->device, response->packet,
1192                                info->max_receive_size, DMA_FROM_DEVICE);
1193        if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1194                return rc;
1195
1196        response->sge.length = info->max_receive_size;
1197        response->sge.lkey = info->pd->local_dma_lkey;
1198
1199        response->cqe.done = recv_done;
1200
1201        recv_wr.wr_cqe = &response->cqe;
1202        recv_wr.next = NULL;
1203        recv_wr.sg_list = &response->sge;
1204        recv_wr.num_sge = 1;
1205
1206        rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1207        if (rc) {
1208                ib_dma_unmap_single(info->id->device, response->sge.addr,
1209                                    response->sge.length, DMA_FROM_DEVICE);
1210                smbd_disconnect_rdma_connection(info);
1211                log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1212        }
1213
1214        return rc;
1215}
1216
1217/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1218static int smbd_negotiate(struct smbd_connection *info)
1219{
1220        int rc;
1221        struct smbd_response *response = get_receive_buffer(info);
1222
1223        response->type = SMBD_NEGOTIATE_RESP;
1224        rc = smbd_post_recv(info, response);
1225        log_rdma_event(INFO,
1226                "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x "
1227                "iov.lkey=%x\n",
1228                rc, response->sge.addr,
1229                response->sge.length, response->sge.lkey);
1230        if (rc)
1231                return rc;
1232
1233        init_completion(&info->negotiate_completion);
1234        info->negotiate_done = false;
1235        rc = smbd_post_send_negotiate_req(info);
1236        if (rc)
1237                return rc;
1238
1239        rc = wait_for_completion_interruptible_timeout(
1240                &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1241        log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1242
1243        if (info->negotiate_done)
1244                return 0;
1245
1246        if (rc == 0)
1247                rc = -ETIMEDOUT;
1248        else if (rc == -ERESTARTSYS)
1249                rc = -EINTR;
1250        else
1251                rc = -ENOTCONN;
1252
1253        return rc;
1254}
1255
1256static void put_empty_packet(
1257                struct smbd_connection *info, struct smbd_response *response)
1258{
1259        spin_lock(&info->empty_packet_queue_lock);
1260        list_add_tail(&response->list, &info->empty_packet_queue);
1261        info->count_empty_packet_queue++;
1262        spin_unlock(&info->empty_packet_queue_lock);
1263
1264        queue_work(info->workqueue, &info->post_send_credits_work);
1265}
1266
1267/*
1268 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1269 * This is a queue for reassembling upper layer payload and present to upper
1270 * layer. All the inncoming payload go to the reassembly queue, regardless of
1271 * if reassembly is required. The uuper layer code reads from the queue for all
1272 * incoming payloads.
1273 * Put a received packet to the reassembly queue
1274 * response: the packet received
1275 * data_length: the size of payload in this packet
1276 */
1277static void enqueue_reassembly(
1278        struct smbd_connection *info,
1279        struct smbd_response *response,
1280        int data_length)
1281{
1282        spin_lock(&info->reassembly_queue_lock);
1283        list_add_tail(&response->list, &info->reassembly_queue);
1284        info->reassembly_queue_length++;
1285        /*
1286         * Make sure reassembly_data_length is updated after list and
1287         * reassembly_queue_length are updated. On the dequeue side
1288         * reassembly_data_length is checked without a lock to determine
1289         * if reassembly_queue_length and list is up to date
1290         */
1291        virt_wmb();
1292        info->reassembly_data_length += data_length;
1293        spin_unlock(&info->reassembly_queue_lock);
1294        info->count_reassembly_queue++;
1295        info->count_enqueue_reassembly_queue++;
1296}
1297
1298/*
1299 * Get the first entry at the front of reassembly queue
1300 * Caller is responsible for locking
1301 * return value: the first entry if any, NULL if queue is empty
1302 */
1303static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1304{
1305        struct smbd_response *ret = NULL;
1306
1307        if (!list_empty(&info->reassembly_queue)) {
1308                ret = list_first_entry(
1309                        &info->reassembly_queue,
1310                        struct smbd_response, list);
1311        }
1312        return ret;
1313}
1314
1315static struct smbd_response *get_empty_queue_buffer(
1316                struct smbd_connection *info)
1317{
1318        struct smbd_response *ret = NULL;
1319        unsigned long flags;
1320
1321        spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1322        if (!list_empty(&info->empty_packet_queue)) {
1323                ret = list_first_entry(
1324                        &info->empty_packet_queue,
1325                        struct smbd_response, list);
1326                list_del(&ret->list);
1327                info->count_empty_packet_queue--;
1328        }
1329        spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1330
1331        return ret;
1332}
1333
1334/*
1335 * Get a receive buffer
1336 * For each remote send, we need to post a receive. The receive buffers are
1337 * pre-allocated in advance.
1338 * return value: the receive buffer, NULL if none is available
1339 */
1340static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1341{
1342        struct smbd_response *ret = NULL;
1343        unsigned long flags;
1344
1345        spin_lock_irqsave(&info->receive_queue_lock, flags);
1346        if (!list_empty(&info->receive_queue)) {
1347                ret = list_first_entry(
1348                        &info->receive_queue,
1349                        struct smbd_response, list);
1350                list_del(&ret->list);
1351                info->count_receive_queue--;
1352                info->count_get_receive_buffer++;
1353        }
1354        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1355
1356        return ret;
1357}
1358
1359/*
1360 * Return a receive buffer
1361 * Upon returning of a receive buffer, we can post new receive and extend
1362 * more receive credits to remote peer. This is done immediately after a
1363 * receive buffer is returned.
1364 */
1365static void put_receive_buffer(
1366        struct smbd_connection *info, struct smbd_response *response)
1367{
1368        unsigned long flags;
1369
1370        ib_dma_unmap_single(info->id->device, response->sge.addr,
1371                response->sge.length, DMA_FROM_DEVICE);
1372
1373        spin_lock_irqsave(&info->receive_queue_lock, flags);
1374        list_add_tail(&response->list, &info->receive_queue);
1375        info->count_receive_queue++;
1376        info->count_put_receive_buffer++;
1377        spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1378
1379        queue_work(info->workqueue, &info->post_send_credits_work);
1380}
1381
1382/* Preallocate all receive buffer on transport establishment */
1383static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1384{
1385        int i;
1386        struct smbd_response *response;
1387
1388        INIT_LIST_HEAD(&info->reassembly_queue);
1389        spin_lock_init(&info->reassembly_queue_lock);
1390        info->reassembly_data_length = 0;
1391        info->reassembly_queue_length = 0;
1392
1393        INIT_LIST_HEAD(&info->receive_queue);
1394        spin_lock_init(&info->receive_queue_lock);
1395        info->count_receive_queue = 0;
1396
1397        INIT_LIST_HEAD(&info->empty_packet_queue);
1398        spin_lock_init(&info->empty_packet_queue_lock);
1399        info->count_empty_packet_queue = 0;
1400
1401        init_waitqueue_head(&info->wait_receive_queues);
1402
1403        for (i = 0; i < num_buf; i++) {
1404                response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1405                if (!response)
1406                        goto allocate_failed;
1407
1408                response->info = info;
1409                list_add_tail(&response->list, &info->receive_queue);
1410                info->count_receive_queue++;
1411        }
1412
1413        return 0;
1414
1415allocate_failed:
1416        while (!list_empty(&info->receive_queue)) {
1417                response = list_first_entry(
1418                                &info->receive_queue,
1419                                struct smbd_response, list);
1420                list_del(&response->list);
1421                info->count_receive_queue--;
1422
1423                mempool_free(response, info->response_mempool);
1424        }
1425        return -ENOMEM;
1426}
1427
1428static void destroy_receive_buffers(struct smbd_connection *info)
1429{
1430        struct smbd_response *response;
1431
1432        while ((response = get_receive_buffer(info)))
1433                mempool_free(response, info->response_mempool);
1434
1435        while ((response = get_empty_queue_buffer(info)))
1436                mempool_free(response, info->response_mempool);
1437}
1438
1439/*
1440 * Check and send an immediate or keep alive packet
1441 * The condition to send those packets are defined in [MS-SMBD] 3.1.1.1
1442 * Connection.KeepaliveRequested and Connection.SendImmediate
1443 * The idea is to extend credits to server as soon as it becomes available
1444 */
1445static void send_immediate_work(struct work_struct *work)
1446{
1447        struct smbd_connection *info = container_of(
1448                                        work, struct smbd_connection,
1449                                        send_immediate_work.work);
1450
1451        if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
1452            info->send_immediate) {
1453                log_keep_alive(INFO, "send an empty message\n");
1454                smbd_post_send_empty(info);
1455        }
1456}
1457
1458/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1459static void idle_connection_timer(struct work_struct *work)
1460{
1461        struct smbd_connection *info = container_of(
1462                                        work, struct smbd_connection,
1463                                        idle_timer_work.work);
1464
1465        if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1466                log_keep_alive(ERR,
1467                        "error status info->keep_alive_requested=%d\n",
1468                        info->keep_alive_requested);
1469                smbd_disconnect_rdma_connection(info);
1470                return;
1471        }
1472
1473        log_keep_alive(INFO, "about to send an empty idle message\n");
1474        smbd_post_send_empty(info);
1475
1476        /* Setup the next idle timeout work */
1477        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1478                        info->keep_alive_interval*HZ);
1479}
1480
1481/* Destroy this SMBD connection, called from upper layer */
1482void smbd_destroy(struct smbd_connection *info)
1483{
1484        log_rdma_event(INFO, "destroying rdma session\n");
1485
1486        /* Kick off the disconnection process */
1487        smbd_disconnect_rdma_connection(info);
1488
1489        log_rdma_event(INFO, "wait for transport being destroyed\n");
1490        wait_event(info->wait_destroy,
1491                info->transport_status == SMBD_DESTROYED);
1492
1493        destroy_workqueue(info->workqueue);
1494        kfree(info);
1495}
1496
1497/*
1498 * Reconnect this SMBD connection, called from upper layer
1499 * return value: 0 on success, or actual error code
1500 */
1501int smbd_reconnect(struct TCP_Server_Info *server)
1502{
1503        log_rdma_event(INFO, "reconnecting rdma session\n");
1504
1505        if (!server->smbd_conn) {
1506                log_rdma_event(INFO, "rdma session already destroyed\n");
1507                goto create_conn;
1508        }
1509
1510        /*
1511         * This is possible if transport is disconnected and we haven't received
1512         * notification from RDMA, but upper layer has detected timeout
1513         */
1514        if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1515                log_rdma_event(INFO, "disconnecting transport\n");
1516                smbd_disconnect_rdma_connection(server->smbd_conn);
1517        }
1518
1519        /* wait until the transport is destroyed */
1520        if (!wait_event_timeout(server->smbd_conn->wait_destroy,
1521                server->smbd_conn->transport_status == SMBD_DESTROYED, 5*HZ))
1522                return -EAGAIN;
1523
1524        destroy_workqueue(server->smbd_conn->workqueue);
1525        kfree(server->smbd_conn);
1526
1527create_conn:
1528        log_rdma_event(INFO, "creating rdma session\n");
1529        server->smbd_conn = smbd_get_connection(
1530                server, (struct sockaddr *) &server->dstaddr);
1531        log_rdma_event(INFO, "created rdma session info=%p\n",
1532                server->smbd_conn);
1533
1534        return server->smbd_conn ? 0 : -ENOENT;
1535}
1536
1537static void destroy_caches_and_workqueue(struct smbd_connection *info)
1538{
1539        destroy_receive_buffers(info);
1540        destroy_workqueue(info->workqueue);
1541        mempool_destroy(info->response_mempool);
1542        kmem_cache_destroy(info->response_cache);
1543        mempool_destroy(info->request_mempool);
1544        kmem_cache_destroy(info->request_cache);
1545}
1546
1547#define MAX_NAME_LEN    80
1548static int allocate_caches_and_workqueue(struct smbd_connection *info)
1549{
1550        char name[MAX_NAME_LEN];
1551        int rc;
1552
1553        snprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1554        info->request_cache =
1555                kmem_cache_create(
1556                        name,
1557                        sizeof(struct smbd_request) +
1558                                sizeof(struct smbd_data_transfer),
1559                        0, SLAB_HWCACHE_ALIGN, NULL);
1560        if (!info->request_cache)
1561                return -ENOMEM;
1562
1563        info->request_mempool =
1564                mempool_create(info->send_credit_target, mempool_alloc_slab,
1565                        mempool_free_slab, info->request_cache);
1566        if (!info->request_mempool)
1567                goto out1;
1568
1569        snprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1570        info->response_cache =
1571                kmem_cache_create(
1572                        name,
1573                        sizeof(struct smbd_response) +
1574                                info->max_receive_size,
1575                        0, SLAB_HWCACHE_ALIGN, NULL);
1576        if (!info->response_cache)
1577                goto out2;
1578
1579        info->response_mempool =
1580                mempool_create(info->receive_credit_max, mempool_alloc_slab,
1581                       mempool_free_slab, info->response_cache);
1582        if (!info->response_mempool)
1583                goto out3;
1584
1585        snprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1586        info->workqueue = create_workqueue(name);
1587        if (!info->workqueue)
1588                goto out4;
1589
1590        rc = allocate_receive_buffers(info, info->receive_credit_max);
1591        if (rc) {
1592                log_rdma_event(ERR, "failed to allocate receive buffers\n");
1593                goto out5;
1594        }
1595
1596        return 0;
1597
1598out5:
1599        destroy_workqueue(info->workqueue);
1600out4:
1601        mempool_destroy(info->response_mempool);
1602out3:
1603        kmem_cache_destroy(info->response_cache);
1604out2:
1605        mempool_destroy(info->request_mempool);
1606out1:
1607        kmem_cache_destroy(info->request_cache);
1608        return -ENOMEM;
1609}
1610
1611/* Create a SMBD connection, called by upper layer */
1612static struct smbd_connection *_smbd_get_connection(
1613        struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1614{
1615        int rc;
1616        struct smbd_connection *info;
1617        struct rdma_conn_param conn_param;
1618        struct ib_qp_init_attr qp_attr;
1619        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1620        struct ib_port_immutable port_immutable;
1621        u32 ird_ord_hdr[2];
1622
1623        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1624        if (!info)
1625                return NULL;
1626
1627        info->transport_status = SMBD_CONNECTING;
1628        rc = smbd_ia_open(info, dstaddr, port);
1629        if (rc) {
1630                log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1631                goto create_id_failed;
1632        }
1633
1634        if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1635            smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1636                log_rdma_event(ERR,
1637                        "consider lowering send_credit_target = %d. "
1638                        "Possible CQE overrun, device "
1639                        "reporting max_cpe %d max_qp_wr %d\n",
1640                        smbd_send_credit_target,
1641                        info->id->device->attrs.max_cqe,
1642                        info->id->device->attrs.max_qp_wr);
1643                goto config_failed;
1644        }
1645
1646        if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1647            smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1648                log_rdma_event(ERR,
1649                        "consider lowering receive_credit_max = %d. "
1650                        "Possible CQE overrun, device "
1651                        "reporting max_cpe %d max_qp_wr %d\n",
1652                        smbd_receive_credit_max,
1653                        info->id->device->attrs.max_cqe,
1654                        info->id->device->attrs.max_qp_wr);
1655                goto config_failed;
1656        }
1657
1658        info->receive_credit_max = smbd_receive_credit_max;
1659        info->send_credit_target = smbd_send_credit_target;
1660        info->max_send_size = smbd_max_send_size;
1661        info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1662        info->max_receive_size = smbd_max_receive_size;
1663        info->keep_alive_interval = smbd_keep_alive_interval;
1664
1665        if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1666                log_rdma_event(ERR,
1667                        "warning: device max_send_sge = %d too small\n",
1668                        info->id->device->attrs.max_send_sge);
1669                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1670        }
1671        if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1672                log_rdma_event(ERR,
1673                        "warning: device max_recv_sge = %d too small\n",
1674                        info->id->device->attrs.max_recv_sge);
1675                log_rdma_event(ERR, "Queue Pair creation may fail\n");
1676        }
1677
1678        info->send_cq = NULL;
1679        info->recv_cq = NULL;
1680        info->send_cq = ib_alloc_cq(info->id->device, info,
1681                        info->send_credit_target, 0, IB_POLL_SOFTIRQ);
1682        if (IS_ERR(info->send_cq)) {
1683                info->send_cq = NULL;
1684                goto alloc_cq_failed;
1685        }
1686
1687        info->recv_cq = ib_alloc_cq(info->id->device, info,
1688                        info->receive_credit_max, 0, IB_POLL_SOFTIRQ);
1689        if (IS_ERR(info->recv_cq)) {
1690                info->recv_cq = NULL;
1691                goto alloc_cq_failed;
1692        }
1693
1694        memset(&qp_attr, 0, sizeof(qp_attr));
1695        qp_attr.event_handler = smbd_qp_async_error_upcall;
1696        qp_attr.qp_context = info;
1697        qp_attr.cap.max_send_wr = info->send_credit_target;
1698        qp_attr.cap.max_recv_wr = info->receive_credit_max;
1699        qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1700        qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1701        qp_attr.cap.max_inline_data = 0;
1702        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1703        qp_attr.qp_type = IB_QPT_RC;
1704        qp_attr.send_cq = info->send_cq;
1705        qp_attr.recv_cq = info->recv_cq;
1706        qp_attr.port_num = ~0;
1707
1708        rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1709        if (rc) {
1710                log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1711                goto create_qp_failed;
1712        }
1713
1714        memset(&conn_param, 0, sizeof(conn_param));
1715        conn_param.initiator_depth = 0;
1716
1717        conn_param.responder_resources =
1718                info->id->device->attrs.max_qp_rd_atom
1719                        < SMBD_CM_RESPONDER_RESOURCES ?
1720                info->id->device->attrs.max_qp_rd_atom :
1721                SMBD_CM_RESPONDER_RESOURCES;
1722        info->responder_resources = conn_param.responder_resources;
1723        log_rdma_mr(INFO, "responder_resources=%d\n",
1724                info->responder_resources);
1725
1726        /* Need to send IRD/ORD in private data for iWARP */
1727        info->id->device->get_port_immutable(
1728                info->id->device, info->id->port_num, &port_immutable);
1729        if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1730                ird_ord_hdr[0] = info->responder_resources;
1731                ird_ord_hdr[1] = 1;
1732                conn_param.private_data = ird_ord_hdr;
1733                conn_param.private_data_len = sizeof(ird_ord_hdr);
1734        } else {
1735                conn_param.private_data = NULL;
1736                conn_param.private_data_len = 0;
1737        }
1738
1739        conn_param.retry_count = SMBD_CM_RETRY;
1740        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1741        conn_param.flow_control = 0;
1742        init_waitqueue_head(&info->wait_destroy);
1743
1744        log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1745                &addr_in->sin_addr, port);
1746
1747        init_waitqueue_head(&info->conn_wait);
1748        rc = rdma_connect(info->id, &conn_param);
1749        if (rc) {
1750                log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1751                goto rdma_connect_failed;
1752        }
1753
1754        wait_event_interruptible(
1755                info->conn_wait, info->transport_status != SMBD_CONNECTING);
1756
1757        if (info->transport_status != SMBD_CONNECTED) {
1758                log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1759                goto rdma_connect_failed;
1760        }
1761
1762        log_rdma_event(INFO, "rdma_connect connected\n");
1763
1764        rc = allocate_caches_and_workqueue(info);
1765        if (rc) {
1766                log_rdma_event(ERR, "cache allocation failed\n");
1767                goto allocate_cache_failed;
1768        }
1769
1770        init_waitqueue_head(&info->wait_send_queue);
1771        init_waitqueue_head(&info->wait_reassembly_queue);
1772
1773        INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1774        INIT_DELAYED_WORK(&info->send_immediate_work, send_immediate_work);
1775        queue_delayed_work(info->workqueue, &info->idle_timer_work,
1776                info->keep_alive_interval*HZ);
1777
1778        init_waitqueue_head(&info->wait_smbd_send_pending);
1779        info->smbd_send_pending = 0;
1780
1781        init_waitqueue_head(&info->wait_smbd_recv_pending);
1782        info->smbd_recv_pending = 0;
1783
1784        init_waitqueue_head(&info->wait_send_pending);
1785        atomic_set(&info->send_pending, 0);
1786
1787        init_waitqueue_head(&info->wait_send_payload_pending);
1788        atomic_set(&info->send_payload_pending, 0);
1789
1790        INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1791        INIT_WORK(&info->destroy_work, smbd_destroy_rdma_work);
1792        INIT_WORK(&info->recv_done_work, smbd_recv_done_work);
1793        INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1794        info->new_credits_offered = 0;
1795        spin_lock_init(&info->lock_new_credits_offered);
1796
1797        rc = smbd_negotiate(info);
1798        if (rc) {
1799                log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1800                goto negotiation_failed;
1801        }
1802
1803        rc = allocate_mr_list(info);
1804        if (rc) {
1805                log_rdma_mr(ERR, "memory registration allocation failed\n");
1806                goto allocate_mr_failed;
1807        }
1808
1809        return info;
1810
1811allocate_mr_failed:
1812        /* At this point, need to a full transport shutdown */
1813        smbd_destroy(info);
1814        return NULL;
1815
1816negotiation_failed:
1817        cancel_delayed_work_sync(&info->idle_timer_work);
1818        destroy_caches_and_workqueue(info);
1819        info->transport_status = SMBD_NEGOTIATE_FAILED;
1820        init_waitqueue_head(&info->conn_wait);
1821        rdma_disconnect(info->id);
1822        wait_event(info->conn_wait,
1823                info->transport_status == SMBD_DISCONNECTED);
1824
1825allocate_cache_failed:
1826rdma_connect_failed:
1827        rdma_destroy_qp(info->id);
1828
1829create_qp_failed:
1830alloc_cq_failed:
1831        if (info->send_cq)
1832                ib_free_cq(info->send_cq);
1833        if (info->recv_cq)
1834                ib_free_cq(info->recv_cq);
1835
1836config_failed:
1837        ib_dealloc_pd(info->pd);
1838        rdma_destroy_id(info->id);
1839
1840create_id_failed:
1841        kfree(info);
1842        return NULL;
1843}
1844
1845struct smbd_connection *smbd_get_connection(
1846        struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1847{
1848        struct smbd_connection *ret;
1849        int port = SMBD_PORT;
1850
1851try_again:
1852        ret = _smbd_get_connection(server, dstaddr, port);
1853
1854        /* Try SMB_PORT if SMBD_PORT doesn't work */
1855        if (!ret && port == SMBD_PORT) {
1856                port = SMB_PORT;
1857                goto try_again;
1858        }
1859        return ret;
1860}
1861
1862/*
1863 * Receive data from receive reassembly queue
1864 * All the incoming data packets are placed in reassembly queue
1865 * buf: the buffer to read data into
1866 * size: the length of data to read
1867 * return value: actual data read
1868 * Note: this implementation copies the data from reassebmly queue to receive
1869 * buffers used by upper layer. This is not the optimal code path. A better way
1870 * to do it is to not have upper layer allocate its receive buffers but rather
1871 * borrow the buffer from reassembly queue, and return it after data is
1872 * consumed. But this will require more changes to upper layer code, and also
1873 * need to consider packet boundaries while they still being reassembled.
1874 */
1875static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1876                unsigned int size)
1877{
1878        struct smbd_response *response;
1879        struct smbd_data_transfer *data_transfer;
1880        int to_copy, to_read, data_read, offset;
1881        u32 data_length, remaining_data_length, data_offset;
1882        int rc;
1883
1884again:
1885        if (info->transport_status != SMBD_CONNECTED) {
1886                log_read(ERR, "disconnected\n");
1887                return -ENODEV;
1888        }
1889
1890        /*
1891         * No need to hold the reassembly queue lock all the time as we are
1892         * the only one reading from the front of the queue. The transport
1893         * may add more entries to the back of the queue at the same time
1894         */
1895        log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1896                info->reassembly_data_length);
1897        if (info->reassembly_data_length >= size) {
1898                int queue_length;
1899                int queue_removed = 0;
1900
1901                /*
1902                 * Need to make sure reassembly_data_length is read before
1903                 * reading reassembly_queue_length and calling
1904                 * _get_first_reassembly. This call is lock free
1905                 * as we never read at the end of the queue which are being
1906                 * updated in SOFTIRQ as more data is received
1907                 */
1908                virt_rmb();
1909                queue_length = info->reassembly_queue_length;
1910                data_read = 0;
1911                to_read = size;
1912                offset = info->first_entry_offset;
1913                while (data_read < size) {
1914                        response = _get_first_reassembly(info);
1915                        data_transfer = smbd_response_payload(response);
1916                        data_length = le32_to_cpu(data_transfer->data_length);
1917                        remaining_data_length =
1918                                le32_to_cpu(
1919                                        data_transfer->remaining_data_length);
1920                        data_offset = le32_to_cpu(data_transfer->data_offset);
1921
1922                        /*
1923                         * The upper layer expects RFC1002 length at the
1924                         * beginning of the payload. Return it to indicate
1925                         * the total length of the packet. This minimize the
1926                         * change to upper layer packet processing logic. This
1927                         * will be eventually remove when an intermediate
1928                         * transport layer is added
1929                         */
1930                        if (response->first_segment && size == 4) {
1931                                unsigned int rfc1002_len =
1932                                        data_length + remaining_data_length;
1933                                *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1934                                data_read = 4;
1935                                response->first_segment = false;
1936                                log_read(INFO, "returning rfc1002 length %d\n",
1937                                        rfc1002_len);
1938                                goto read_rfc1002_done;
1939                        }
1940
1941                        to_copy = min_t(int, data_length - offset, to_read);
1942                        memcpy(
1943                                buf + data_read,
1944                                (char *)data_transfer + data_offset + offset,
1945                                to_copy);
1946
1947                        /* move on to the next buffer? */
1948                        if (to_copy == data_length - offset) {
1949                                queue_length--;
1950                                /*
1951                                 * No need to lock if we are not at the
1952                                 * end of the queue
1953                                 */
1954                                if (queue_length)
1955                                        list_del(&response->list);
1956                                else {
1957                                        spin_lock_irq(
1958                                                &info->reassembly_queue_lock);
1959                                        list_del(&response->list);
1960                                        spin_unlock_irq(
1961                                                &info->reassembly_queue_lock);
1962                                }
1963                                queue_removed++;
1964                                info->count_reassembly_queue--;
1965                                info->count_dequeue_reassembly_queue++;
1966                                put_receive_buffer(info, response);
1967                                offset = 0;
1968                                log_read(INFO, "put_receive_buffer offset=0\n");
1969                        } else
1970                                offset += to_copy;
1971
1972                        to_read -= to_copy;
1973                        data_read += to_copy;
1974
1975                        log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1976                                "data_transfer_length-offset=%d after that "
1977                                "to_read=%d data_read=%d offset=%d\n",
1978                                to_copy, data_length - offset,
1979                                to_read, data_read, offset);
1980                }
1981
1982                spin_lock_irq(&info->reassembly_queue_lock);
1983                info->reassembly_data_length -= data_read;
1984                info->reassembly_queue_length -= queue_removed;
1985                spin_unlock_irq(&info->reassembly_queue_lock);
1986
1987                info->first_entry_offset = offset;
1988                log_read(INFO, "returning to thread data_read=%d "
1989                        "reassembly_data_length=%d first_entry_offset=%d\n",
1990                        data_read, info->reassembly_data_length,
1991                        info->first_entry_offset);
1992read_rfc1002_done:
1993                return data_read;
1994        }
1995
1996        log_read(INFO, "wait_event on more data\n");
1997        rc = wait_event_interruptible(
1998                info->wait_reassembly_queue,
1999                info->reassembly_data_length >= size ||
2000                        info->transport_status != SMBD_CONNECTED);
2001        /* Don't return any data if interrupted */
2002        if (rc)
2003                return -ENODEV;
2004
2005        goto again;
2006}
2007
2008/*
2009 * Receive a page from receive reassembly queue
2010 * page: the page to read data into
2011 * to_read: the length of data to read
2012 * return value: actual data read
2013 */
2014static int smbd_recv_page(struct smbd_connection *info,
2015                struct page *page, unsigned int page_offset,
2016                unsigned int to_read)
2017{
2018        int ret;
2019        char *to_address;
2020        void *page_address;
2021
2022        /* make sure we have the page ready for read */
2023        ret = wait_event_interruptible(
2024                info->wait_reassembly_queue,
2025                info->reassembly_data_length >= to_read ||
2026                        info->transport_status != SMBD_CONNECTED);
2027        if (ret)
2028                return ret;
2029
2030        /* now we can read from reassembly queue and not sleep */
2031        page_address = kmap_atomic(page);
2032        to_address = (char *) page_address + page_offset;
2033
2034        log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
2035                page, to_address, to_read);
2036
2037        ret = smbd_recv_buf(info, to_address, to_read);
2038        kunmap_atomic(page_address);
2039
2040        return ret;
2041}
2042
2043/*
2044 * Receive data from transport
2045 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
2046 * return: total bytes read, or 0. SMB Direct will not do partial read.
2047 */
2048int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
2049{
2050        char *buf;
2051        struct page *page;
2052        unsigned int to_read, page_offset;
2053        int rc;
2054
2055        info->smbd_recv_pending++;
2056
2057        if (iov_iter_rw(&msg->msg_iter) == WRITE) {
2058                /* It's a bug in upper layer to get there */
2059                cifs_dbg(VFS, "CIFS: invalid msg iter dir %u\n",
2060                         iov_iter_rw(&msg->msg_iter));
2061                rc = -EINVAL;
2062                goto out;
2063        }
2064
2065        switch (iov_iter_type(&msg->msg_iter)) {
2066        case ITER_KVEC:
2067                buf = msg->msg_iter.kvec->iov_base;
2068                to_read = msg->msg_iter.kvec->iov_len;
2069                rc = smbd_recv_buf(info, buf, to_read);
2070                break;
2071
2072        case ITER_BVEC:
2073                page = msg->msg_iter.bvec->bv_page;
2074                page_offset = msg->msg_iter.bvec->bv_offset;
2075                to_read = msg->msg_iter.bvec->bv_len;
2076                rc = smbd_recv_page(info, page, page_offset, to_read);
2077                break;
2078
2079        default:
2080                /* It's a bug in upper layer to get there */
2081                cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
2082                         iov_iter_type(&msg->msg_iter));
2083                rc = -EINVAL;
2084        }
2085
2086out:
2087        info->smbd_recv_pending--;
2088        wake_up(&info->wait_smbd_recv_pending);
2089
2090        /* SMBDirect will read it all or nothing */
2091        if (rc > 0)
2092                msg->msg_iter.count = 0;
2093        return rc;
2094}
2095
2096/*
2097 * Send data to transport
2098 * Each rqst is transported as a SMBDirect payload
2099 * rqst: the data to write
2100 * return value: 0 if successfully write, otherwise error code
2101 */
2102int smbd_send(struct TCP_Server_Info *server, struct smb_rqst *rqst)
2103{
2104        struct smbd_connection *info = server->smbd_conn;
2105        struct kvec vec;
2106        int nvecs;
2107        int size;
2108        unsigned int buflen, remaining_data_length;
2109        int start, i, j;
2110        int max_iov_size =
2111                info->max_send_size - sizeof(struct smbd_data_transfer);
2112        struct kvec *iov;
2113        int rc;
2114
2115        info->smbd_send_pending++;
2116        if (info->transport_status != SMBD_CONNECTED) {
2117                rc = -ENODEV;
2118                goto done;
2119        }
2120
2121        /*
2122         * Skip the RFC1002 length defined in MS-SMB2 section 2.1
2123         * It is used only for TCP transport in the iov[0]
2124         * In future we may want to add a transport layer under protocol
2125         * layer so this will only be issued to TCP transport
2126         */
2127
2128        if (rqst->rq_iov[0].iov_len != 4) {
2129                log_write(ERR, "expected the pdu length in 1st iov, but got %zu\n", rqst->rq_iov[0].iov_len);
2130                return -EINVAL;
2131        }
2132
2133        /*
2134         * Add in the page array if there is one. The caller needs to set
2135         * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
2136         * ends at page boundary
2137         */
2138        buflen = smb_rqst_len(server, rqst);
2139
2140        if (buflen + sizeof(struct smbd_data_transfer) >
2141                info->max_fragmented_send_size) {
2142                log_write(ERR, "payload size %d > max size %d\n",
2143                        buflen, info->max_fragmented_send_size);
2144                rc = -EINVAL;
2145                goto done;
2146        }
2147
2148        iov = &rqst->rq_iov[1];
2149
2150        cifs_dbg(FYI, "Sending smb (RDMA): smb_len=%u\n", buflen);
2151        for (i = 0; i < rqst->rq_nvec-1; i++)
2152                dump_smb(iov[i].iov_base, iov[i].iov_len);
2153
2154        remaining_data_length = buflen;
2155
2156        log_write(INFO, "rqst->rq_nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
2157                "rq_tailsz=%d buflen=%d\n",
2158                rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2159                rqst->rq_tailsz, buflen);
2160
2161        start = i = iov[0].iov_len ? 0 : 1;
2162        buflen = 0;
2163        while (true) {
2164                buflen += iov[i].iov_len;
2165                if (buflen > max_iov_size) {
2166                        if (i > start) {
2167                                remaining_data_length -=
2168                                        (buflen-iov[i].iov_len);
2169                                log_write(INFO, "sending iov[] from start=%d "
2170                                        "i=%d nvecs=%d "
2171                                        "remaining_data_length=%d\n",
2172                                        start, i, i-start,
2173                                        remaining_data_length);
2174                                rc = smbd_post_send_data(
2175                                        info, &iov[start], i-start,
2176                                        remaining_data_length);
2177                                if (rc)
2178                                        goto done;
2179                        } else {
2180                                /* iov[start] is too big, break it */
2181                                nvecs = (buflen+max_iov_size-1)/max_iov_size;
2182                                log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
2183                                        " break to %d vectors\n",
2184                                        start, iov[start].iov_base,
2185                                        buflen, nvecs);
2186                                for (j = 0; j < nvecs; j++) {
2187                                        vec.iov_base =
2188                                                (char *)iov[start].iov_base +
2189                                                j*max_iov_size;
2190                                        vec.iov_len = max_iov_size;
2191                                        if (j == nvecs-1)
2192                                                vec.iov_len =
2193                                                        buflen -
2194                                                        max_iov_size*(nvecs-1);
2195                                        remaining_data_length -= vec.iov_len;
2196                                        log_write(INFO,
2197                                                "sending vec j=%d iov_base=%p"
2198                                                " iov_len=%zu "
2199                                                "remaining_data_length=%d\n",
2200                                                j, vec.iov_base, vec.iov_len,
2201                                                remaining_data_length);
2202                                        rc = smbd_post_send_data(
2203                                                info, &vec, 1,
2204                                                remaining_data_length);
2205                                        if (rc)
2206                                                goto done;
2207                                }
2208                                i++;
2209                                if (i == rqst->rq_nvec-1)
2210                                        break;
2211                        }
2212                        start = i;
2213                        buflen = 0;
2214                } else {
2215                        i++;
2216                        if (i == rqst->rq_nvec-1) {
2217                                /* send out all remaining vecs */
2218                                remaining_data_length -= buflen;
2219                                log_write(INFO,
2220                                        "sending iov[] from start=%d i=%d "
2221                                        "nvecs=%d remaining_data_length=%d\n",
2222                                        start, i, i-start,
2223                                        remaining_data_length);
2224                                rc = smbd_post_send_data(info, &iov[start],
2225                                        i-start, remaining_data_length);
2226                                if (rc)
2227                                        goto done;
2228                                break;
2229                        }
2230                }
2231                log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2232        }
2233
2234        /* now sending pages if there are any */
2235        for (i = 0; i < rqst->rq_npages; i++) {
2236                unsigned int offset;
2237
2238                rqst_page_get_length(rqst, i, &buflen, &offset);
2239                nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2240                log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2241                        buflen, nvecs);
2242                for (j = 0; j < nvecs; j++) {
2243                        size = max_iov_size;
2244                        if (j == nvecs-1)
2245                                size = buflen - j*max_iov_size;
2246                        remaining_data_length -= size;
2247                        log_write(INFO, "sending pages i=%d offset=%d size=%d"
2248                                " remaining_data_length=%d\n",
2249                                i, j*max_iov_size+offset, size,
2250                                remaining_data_length);
2251                        rc = smbd_post_send_page(
2252                                info, rqst->rq_pages[i],
2253                                j*max_iov_size + offset,
2254                                size, remaining_data_length);
2255                        if (rc)
2256                                goto done;
2257                }
2258        }
2259
2260done:
2261        /*
2262         * As an optimization, we don't wait for individual I/O to finish
2263         * before sending the next one.
2264         * Send them all and wait for pending send count to get to 0
2265         * that means all the I/Os have been out and we are good to return
2266         */
2267
2268        wait_event(info->wait_send_payload_pending,
2269                atomic_read(&info->send_payload_pending) == 0);
2270
2271        info->smbd_send_pending--;
2272        wake_up(&info->wait_smbd_send_pending);
2273
2274        return rc;
2275}
2276
2277static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2278{
2279        struct smbd_mr *mr;
2280        struct ib_cqe *cqe;
2281
2282        if (wc->status) {
2283                log_rdma_mr(ERR, "status=%d\n", wc->status);
2284                cqe = wc->wr_cqe;
2285                mr = container_of(cqe, struct smbd_mr, cqe);
2286                smbd_disconnect_rdma_connection(mr->conn);
2287        }
2288}
2289
2290/*
2291 * The work queue function that recovers MRs
2292 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2293 * again. Both calls are slow, so finish them in a workqueue. This will not
2294 * block I/O path.
2295 * There is one workqueue that recovers MRs, there is no need to lock as the
2296 * I/O requests calling smbd_register_mr will never update the links in the
2297 * mr_list.
2298 */
2299static void smbd_mr_recovery_work(struct work_struct *work)
2300{
2301        struct smbd_connection *info =
2302                container_of(work, struct smbd_connection, mr_recovery_work);
2303        struct smbd_mr *smbdirect_mr;
2304        int rc;
2305
2306        list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2307                if (smbdirect_mr->state == MR_INVALIDATED)
2308                        ib_dma_unmap_sg(
2309                                info->id->device, smbdirect_mr->sgl,
2310                                smbdirect_mr->sgl_count,
2311                                smbdirect_mr->dir);
2312                else if (smbdirect_mr->state == MR_ERROR) {
2313
2314                        /* recover this MR entry */
2315                        rc = ib_dereg_mr(smbdirect_mr->mr);
2316                        if (rc) {
2317                                log_rdma_mr(ERR,
2318                                        "ib_dereg_mr failed rc=%x\n",
2319                                        rc);
2320                                smbd_disconnect_rdma_connection(info);
2321                                continue;
2322                        }
2323
2324                        smbdirect_mr->mr = ib_alloc_mr(
2325                                info->pd, info->mr_type,
2326                                info->max_frmr_depth);
2327                        if (IS_ERR(smbdirect_mr->mr)) {
2328                                log_rdma_mr(ERR,
2329                                        "ib_alloc_mr failed mr_type=%x "
2330                                        "max_frmr_depth=%x\n",
2331                                        info->mr_type,
2332                                        info->max_frmr_depth);
2333                                smbd_disconnect_rdma_connection(info);
2334                                continue;
2335                        }
2336                } else
2337                        /* This MR is being used, don't recover it */
2338                        continue;
2339
2340                smbdirect_mr->state = MR_READY;
2341
2342                /* smbdirect_mr->state is updated by this function
2343                 * and is read and updated by I/O issuing CPUs trying
2344                 * to get a MR, the call to atomic_inc_return
2345                 * implicates a memory barrier and guarantees this
2346                 * value is updated before waking up any calls to
2347                 * get_mr() from the I/O issuing CPUs
2348                 */
2349                if (atomic_inc_return(&info->mr_ready_count) == 1)
2350                        wake_up_interruptible(&info->wait_mr);
2351        }
2352}
2353
2354static void destroy_mr_list(struct smbd_connection *info)
2355{
2356        struct smbd_mr *mr, *tmp;
2357
2358        cancel_work_sync(&info->mr_recovery_work);
2359        list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2360                if (mr->state == MR_INVALIDATED)
2361                        ib_dma_unmap_sg(info->id->device, mr->sgl,
2362                                mr->sgl_count, mr->dir);
2363                ib_dereg_mr(mr->mr);
2364                kfree(mr->sgl);
2365                kfree(mr);
2366        }
2367}
2368
2369/*
2370 * Allocate MRs used for RDMA read/write
2371 * The number of MRs will not exceed hardware capability in responder_resources
2372 * All MRs are kept in mr_list. The MR can be recovered after it's used
2373 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2374 * as MRs are used and recovered for I/O, but the list links will not change
2375 */
2376static int allocate_mr_list(struct smbd_connection *info)
2377{
2378        int i;
2379        struct smbd_mr *smbdirect_mr, *tmp;
2380
2381        INIT_LIST_HEAD(&info->mr_list);
2382        init_waitqueue_head(&info->wait_mr);
2383        spin_lock_init(&info->mr_list_lock);
2384        atomic_set(&info->mr_ready_count, 0);
2385        atomic_set(&info->mr_used_count, 0);
2386        init_waitqueue_head(&info->wait_for_mr_cleanup);
2387        /* Allocate more MRs (2x) than hardware responder_resources */
2388        for (i = 0; i < info->responder_resources * 2; i++) {
2389                smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2390                if (!smbdirect_mr)
2391                        goto out;
2392                smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2393                                        info->max_frmr_depth);
2394                if (IS_ERR(smbdirect_mr->mr)) {
2395                        log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2396                                "max_frmr_depth=%x\n",
2397                                info->mr_type, info->max_frmr_depth);
2398                        goto out;
2399                }
2400                smbdirect_mr->sgl = kcalloc(
2401                                        info->max_frmr_depth,
2402                                        sizeof(struct scatterlist),
2403                                        GFP_KERNEL);
2404                if (!smbdirect_mr->sgl) {
2405                        log_rdma_mr(ERR, "failed to allocate sgl\n");
2406                        ib_dereg_mr(smbdirect_mr->mr);
2407                        goto out;
2408                }
2409                smbdirect_mr->state = MR_READY;
2410                smbdirect_mr->conn = info;
2411
2412                list_add_tail(&smbdirect_mr->list, &info->mr_list);
2413                atomic_inc(&info->mr_ready_count);
2414        }
2415        INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2416        return 0;
2417
2418out:
2419        kfree(smbdirect_mr);
2420
2421        list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2422                ib_dereg_mr(smbdirect_mr->mr);
2423                kfree(smbdirect_mr->sgl);
2424                kfree(smbdirect_mr);
2425        }
2426        return -ENOMEM;
2427}
2428
2429/*
2430 * Get a MR from mr_list. This function waits until there is at least one
2431 * MR available in the list. It may access the list while the
2432 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2433 * as they never modify the same places. However, there may be several CPUs
2434 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2435 * protect this situation.
2436 */
2437static struct smbd_mr *get_mr(struct smbd_connection *info)
2438{
2439        struct smbd_mr *ret;
2440        int rc;
2441again:
2442        rc = wait_event_interruptible(info->wait_mr,
2443                atomic_read(&info->mr_ready_count) ||
2444                info->transport_status != SMBD_CONNECTED);
2445        if (rc) {
2446                log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2447                return NULL;
2448        }
2449
2450        if (info->transport_status != SMBD_CONNECTED) {
2451                log_rdma_mr(ERR, "info->transport_status=%x\n",
2452                        info->transport_status);
2453                return NULL;
2454        }
2455
2456        spin_lock(&info->mr_list_lock);
2457        list_for_each_entry(ret, &info->mr_list, list) {
2458                if (ret->state == MR_READY) {
2459                        ret->state = MR_REGISTERED;
2460                        spin_unlock(&info->mr_list_lock);
2461                        atomic_dec(&info->mr_ready_count);
2462                        atomic_inc(&info->mr_used_count);
2463                        return ret;
2464                }
2465        }
2466
2467        spin_unlock(&info->mr_list_lock);
2468        /*
2469         * It is possible that we could fail to get MR because other processes may
2470         * try to acquire a MR at the same time. If this is the case, retry it.
2471         */
2472        goto again;
2473}
2474
2475/*
2476 * Register memory for RDMA read/write
2477 * pages[]: the list of pages to register memory with
2478 * num_pages: the number of pages to register
2479 * tailsz: if non-zero, the bytes to register in the last page
2480 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2481 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2482 * return value: the MR registered, NULL if failed.
2483 */
2484struct smbd_mr *smbd_register_mr(
2485        struct smbd_connection *info, struct page *pages[], int num_pages,
2486        int offset, int tailsz, bool writing, bool need_invalidate)
2487{
2488        struct smbd_mr *smbdirect_mr;
2489        int rc, i;
2490        enum dma_data_direction dir;
2491        struct ib_reg_wr *reg_wr;
2492
2493        if (num_pages > info->max_frmr_depth) {
2494                log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2495                        num_pages, info->max_frmr_depth);
2496                return NULL;
2497        }
2498
2499        smbdirect_mr = get_mr(info);
2500        if (!smbdirect_mr) {
2501                log_rdma_mr(ERR, "get_mr returning NULL\n");
2502                return NULL;
2503        }
2504        smbdirect_mr->need_invalidate = need_invalidate;
2505        smbdirect_mr->sgl_count = num_pages;
2506        sg_init_table(smbdirect_mr->sgl, num_pages);
2507
2508        log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2509                        num_pages, offset, tailsz);
2510
2511        if (num_pages == 1) {
2512                sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2513                goto skip_multiple_pages;
2514        }
2515
2516        /* We have at least two pages to register */
2517        sg_set_page(
2518                &smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2519        i = 1;
2520        while (i < num_pages - 1) {
2521                sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2522                i++;
2523        }
2524        sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2525                tailsz ? tailsz : PAGE_SIZE, 0);
2526
2527skip_multiple_pages:
2528        dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2529        smbdirect_mr->dir = dir;
2530        rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2531        if (!rc) {
2532                log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2533                        num_pages, dir, rc);
2534                goto dma_map_error;
2535        }
2536
2537        rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2538                NULL, PAGE_SIZE);
2539        if (rc != num_pages) {
2540                log_rdma_mr(ERR,
2541                        "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2542                        rc, num_pages);
2543                goto map_mr_error;
2544        }
2545
2546        ib_update_fast_reg_key(smbdirect_mr->mr,
2547                ib_inc_rkey(smbdirect_mr->mr->rkey));
2548        reg_wr = &smbdirect_mr->wr;
2549        reg_wr->wr.opcode = IB_WR_REG_MR;
2550        smbdirect_mr->cqe.done = register_mr_done;
2551        reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2552        reg_wr->wr.num_sge = 0;
2553        reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2554        reg_wr->mr = smbdirect_mr->mr;
2555        reg_wr->key = smbdirect_mr->mr->rkey;
2556        reg_wr->access = writing ?
2557                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2558                        IB_ACCESS_REMOTE_READ;
2559
2560        /*
2561         * There is no need for waiting for complemtion on ib_post_send
2562         * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2563         * on the next ib_post_send when we actaully send I/O to remote peer
2564         */
2565        rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2566        if (!rc)
2567                return smbdirect_mr;
2568
2569        log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2570                rc, reg_wr->key);
2571
2572        /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2573map_mr_error:
2574        ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2575                smbdirect_mr->sgl_count, smbdirect_mr->dir);
2576
2577dma_map_error:
2578        smbdirect_mr->state = MR_ERROR;
2579        if (atomic_dec_and_test(&info->mr_used_count))
2580                wake_up(&info->wait_for_mr_cleanup);
2581
2582        smbd_disconnect_rdma_connection(info);
2583
2584        return NULL;
2585}
2586
2587static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2588{
2589        struct smbd_mr *smbdirect_mr;
2590        struct ib_cqe *cqe;
2591
2592        cqe = wc->wr_cqe;
2593        smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2594        smbdirect_mr->state = MR_INVALIDATED;
2595        if (wc->status != IB_WC_SUCCESS) {
2596                log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2597                smbdirect_mr->state = MR_ERROR;
2598        }
2599        complete(&smbdirect_mr->invalidate_done);
2600}
2601
2602/*
2603 * Deregister a MR after I/O is done
2604 * This function may wait if remote invalidation is not used
2605 * and we have to locally invalidate the buffer to prevent data is being
2606 * modified by remote peer after upper layer consumes it
2607 */
2608int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2609{
2610        struct ib_send_wr *wr;
2611        struct smbd_connection *info = smbdirect_mr->conn;
2612        int rc = 0;
2613
2614        if (smbdirect_mr->need_invalidate) {
2615                /* Need to finish local invalidation before returning */
2616                wr = &smbdirect_mr->inv_wr;
2617                wr->opcode = IB_WR_LOCAL_INV;
2618                smbdirect_mr->cqe.done = local_inv_done;
2619                wr->wr_cqe = &smbdirect_mr->cqe;
2620                wr->num_sge = 0;
2621                wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2622                wr->send_flags = IB_SEND_SIGNALED;
2623
2624                init_completion(&smbdirect_mr->invalidate_done);
2625                rc = ib_post_send(info->id->qp, wr, NULL);
2626                if (rc) {
2627                        log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2628                        smbd_disconnect_rdma_connection(info);
2629                        goto done;
2630                }
2631                wait_for_completion(&smbdirect_mr->invalidate_done);
2632                smbdirect_mr->need_invalidate = false;
2633        } else
2634                /*
2635                 * For remote invalidation, just set it to MR_INVALIDATED
2636                 * and defer to mr_recovery_work to recover the MR for next use
2637                 */
2638                smbdirect_mr->state = MR_INVALIDATED;
2639
2640        /*
2641         * Schedule the work to do MR recovery for future I/Os
2642         * MR recovery is slow and we don't want it to block the current I/O
2643         */
2644        queue_work(info->workqueue, &info->mr_recovery_work);
2645
2646done:
2647        if (atomic_dec_and_test(&info->mr_used_count))
2648                wake_up(&info->wait_for_mr_cleanup);
2649
2650        return rc;
2651}
2652