linux/drivers/staging/lustre/lustre/ptlrpc/client.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33/** Implementation of client-side PortalRPC interfaces */
  34
  35#define DEBUG_SUBSYSTEM S_RPC
  36
  37#include "../include/obd_support.h"
  38#include "../include/obd_class.h"
  39#include "../include/lustre_lib.h"
  40#include "../include/lustre_ha.h"
  41#include "../include/lustre_import.h"
  42#include "../include/lustre_req_layout.h"
  43
  44#include "ptlrpc_internal.h"
  45
  46static int ptlrpc_send_new_req(struct ptlrpc_request *req);
  47static int ptlrpcd_check_work(struct ptlrpc_request *req);
  48static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
  49
  50/**
  51 * Initialize passed in client structure \a cl.
  52 */
  53void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
  54                        struct ptlrpc_client *cl)
  55{
  56        cl->cli_request_portal = req_portal;
  57        cl->cli_reply_portal = rep_portal;
  58        cl->cli_name = name;
  59}
  60EXPORT_SYMBOL(ptlrpc_init_client);
  61
  62/**
  63 * Return PortalRPC connection for remote uud \a uuid
  64 */
  65struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
  66{
  67        struct ptlrpc_connection *c;
  68        lnet_nid_t self;
  69        lnet_process_id_t peer;
  70        int err;
  71
  72        /*
  73         * ptlrpc_uuid_to_peer() initializes its 2nd parameter
  74         * before accessing its values.
  75         * coverity[uninit_use_in_call]
  76         */
  77        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
  78        if (err != 0) {
  79                CNETERR("cannot find peer %s!\n", uuid->uuid);
  80                return NULL;
  81        }
  82
  83        c = ptlrpc_connection_get(peer, self, uuid);
  84        if (c) {
  85                memcpy(c->c_remote_uuid.uuid,
  86                       uuid->uuid, sizeof(c->c_remote_uuid.uuid));
  87        }
  88
  89        CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
  90
  91        return c;
  92}
  93
  94/**
  95 * Allocate and initialize new bulk descriptor on the sender.
  96 * Returns pointer to the descriptor or NULL on error.
  97 */
  98struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
  99                                         unsigned type, unsigned portal)
 100{
 101        struct ptlrpc_bulk_desc *desc;
 102        int i;
 103
 104        desc = kzalloc(offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]),
 105                       GFP_NOFS);
 106        if (!desc)
 107                return NULL;
 108
 109        spin_lock_init(&desc->bd_lock);
 110        init_waitqueue_head(&desc->bd_waitq);
 111        desc->bd_max_iov = npages;
 112        desc->bd_iov_count = 0;
 113        desc->bd_portal = portal;
 114        desc->bd_type = type;
 115        desc->bd_md_count = 0;
 116        LASSERT(max_brw > 0);
 117        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
 118        /*
 119         * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
 120         * node. Negotiated ocd_brw_size will always be <= this number.
 121         */
 122        for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
 123                LNetInvalidateHandle(&desc->bd_mds[i]);
 124
 125        return desc;
 126}
 127
 128/**
 129 * Prepare bulk descriptor for specified outgoing request \a req that
 130 * can fit \a npages * pages. \a type is bulk type. \a portal is where
 131 * the bulk to be sent. Used on client-side.
 132 * Returns pointer to newly allocated initialized bulk descriptor or NULL on
 133 * error.
 134 */
 135struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
 136                                              unsigned npages, unsigned max_brw,
 137                                              unsigned type, unsigned portal)
 138{
 139        struct obd_import *imp = req->rq_import;
 140        struct ptlrpc_bulk_desc *desc;
 141
 142        LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
 143        desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
 144        if (!desc)
 145                return NULL;
 146
 147        desc->bd_import_generation = req->rq_import_generation;
 148        desc->bd_import = class_import_get(imp);
 149        desc->bd_req = req;
 150
 151        desc->bd_cbid.cbid_fn = client_bulk_callback;
 152        desc->bd_cbid.cbid_arg = desc;
 153
 154        /* This makes req own desc, and free it when she frees herself */
 155        req->rq_bulk = desc;
 156
 157        return desc;
 158}
 159EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
 160
 161/**
 162 * Add a page \a page to the bulk descriptor \a desc.
 163 * Data to transfer in the page starts at offset \a pageoffset and
 164 * amount of data to transfer from the page is \a len
 165 */
 166void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 167                             struct page *page, int pageoffset, int len, int pin)
 168{
 169        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
 170        LASSERT(page);
 171        LASSERT(pageoffset >= 0);
 172        LASSERT(len > 0);
 173        LASSERT(pageoffset + len <= PAGE_SIZE);
 174
 175        desc->bd_nob += len;
 176
 177        if (pin)
 178                get_page(page);
 179
 180        ptlrpc_add_bulk_page(desc, page, pageoffset, len);
 181}
 182EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
 183
 184/**
 185 * Uninitialize and free bulk descriptor \a desc.
 186 * Works on bulk descriptors both from server and client side.
 187 */
 188void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
 189{
 190        int i;
 191
 192        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
 193        LASSERT(desc->bd_md_count == 0);         /* network hands off */
 194        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
 195
 196        sptlrpc_enc_pool_put_pages(desc);
 197
 198        if (desc->bd_export)
 199                class_export_put(desc->bd_export);
 200        else
 201                class_import_put(desc->bd_import);
 202
 203        if (unpin) {
 204                for (i = 0; i < desc->bd_iov_count; i++)
 205                        put_page(desc->bd_iov[i].bv_page);
 206        }
 207
 208        kfree(desc);
 209}
 210EXPORT_SYMBOL(__ptlrpc_free_bulk);
 211
 212/**
 213 * Set server timelimit for this req, i.e. how long are we willing to wait
 214 * for reply before timing out this request.
 215 */
 216void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
 217{
 218        __u32 serv_est;
 219        int idx;
 220        struct imp_at *at;
 221
 222        LASSERT(req->rq_import);
 223
 224        if (AT_OFF) {
 225                /*
 226                 * non-AT settings
 227                 *
 228                 * \a imp_server_timeout means this is reverse import and
 229                 * we send (currently only) ASTs to the client and cannot afford
 230                 * to wait too long for the reply, otherwise the other client
 231                 * (because of which we are sending this request) would
 232                 * timeout waiting for us
 233                 */
 234                req->rq_timeout = req->rq_import->imp_server_timeout ?
 235                                  obd_timeout / 2 : obd_timeout;
 236        } else {
 237                at = &req->rq_import->imp_at;
 238                idx = import_at_get_index(req->rq_import,
 239                                          req->rq_request_portal);
 240                serv_est = at_get(&at->iat_service_estimate[idx]);
 241                req->rq_timeout = at_est2timeout(serv_est);
 242        }
 243        /*
 244         * We could get even fancier here, using history to predict increased
 245         * loading...
 246         */
 247
 248        /*
 249         * Let the server know what this RPC timeout is by putting it in the
 250         * reqmsg
 251         */
 252        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
 253}
 254EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 255
 256/* Adjust max service estimate based on server value */
 257static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
 258                                  unsigned int serv_est)
 259{
 260        int idx;
 261        unsigned int oldse;
 262        struct imp_at *at;
 263
 264        LASSERT(req->rq_import);
 265        at = &req->rq_import->imp_at;
 266
 267        idx = import_at_get_index(req->rq_import, req->rq_request_portal);
 268        /*
 269         * max service estimates are tracked on the server side,
 270         * so just keep minimal history here
 271         */
 272        oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
 273        if (oldse != 0)
 274                CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
 275                       req->rq_import->imp_obd->obd_name, req->rq_request_portal,
 276                       oldse, at_get(&at->iat_service_estimate[idx]));
 277}
 278
 279/* Expected network latency per remote node (secs) */
 280int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
 281{
 282        return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
 283}
 284
 285/* Adjust expected network latency */
 286void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
 287                               unsigned int service_time)
 288{
 289        unsigned int nl, oldnl;
 290        struct imp_at *at;
 291        time64_t now = ktime_get_real_seconds();
 292
 293        LASSERT(req->rq_import);
 294
 295        if (service_time > now - req->rq_sent + 3) {
 296                /*
 297                 * bz16408, however, this can also happen if early reply
 298                 * is lost and client RPC is expired and resent, early reply
 299                 * or reply of original RPC can still be fit in reply buffer
 300                 * of resent RPC, now client is measuring time from the
 301                 * resent time, but server sent back service time of original
 302                 * RPC.
 303                 */
 304                CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
 305                       D_ADAPTTO : D_WARNING,
 306                       "Reported service time %u > total measured time "
 307                       CFS_DURATION_T"\n", service_time,
 308                       (long)(now - req->rq_sent));
 309                return;
 310        }
 311
 312        /* Network latency is total time less server processing time */
 313        nl = max_t(int, now - req->rq_sent -
 314                        service_time, 0) + 1; /* st rounding */
 315        at = &req->rq_import->imp_at;
 316
 317        oldnl = at_measured(&at->iat_net_latency, nl);
 318        if (oldnl != 0)
 319                CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) has changed from %d to %d\n",
 320                       req->rq_import->imp_obd->obd_name,
 321                       obd_uuid2str(
 322                               &req->rq_import->imp_connection->c_remote_uuid),
 323                       oldnl, at_get(&at->iat_net_latency));
 324}
 325
 326static int unpack_reply(struct ptlrpc_request *req)
 327{
 328        int rc;
 329
 330        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
 331                rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
 332                if (rc) {
 333                        DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
 334                        return -EPROTO;
 335                }
 336        }
 337
 338        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
 339        if (rc) {
 340                DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
 341                return -EPROTO;
 342        }
 343        return 0;
 344}
 345
 346/**
 347 * Handle an early reply message, called with the rq_lock held.
 348 * If anything goes wrong just ignore it - same as if it never happened
 349 */
 350static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
 351        __must_hold(&req->rq_lock)
 352{
 353        struct ptlrpc_request *early_req;
 354        time64_t olddl;
 355        int rc;
 356
 357        req->rq_early = 0;
 358        spin_unlock(&req->rq_lock);
 359
 360        rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
 361        if (rc) {
 362                spin_lock(&req->rq_lock);
 363                return rc;
 364        }
 365
 366        rc = unpack_reply(early_req);
 367        if (rc) {
 368                sptlrpc_cli_finish_early_reply(early_req);
 369                spin_lock(&req->rq_lock);
 370                return rc;
 371        }
 372
 373        /*
 374         * Use new timeout value just to adjust the local value for this
 375         * request, don't include it into at_history. It is unclear yet why
 376         * service time increased and should it be counted or skipped, e.g.
 377         * that can be recovery case or some error or server, the real reply
 378         * will add all new data if it is worth to add.
 379         */
 380        req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg);
 381        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
 382
 383        /* Network latency can be adjusted, it is pure network delays */
 384        ptlrpc_at_adj_net_latency(req,
 385                                  lustre_msg_get_service_time(early_req->rq_repmsg));
 386
 387        sptlrpc_cli_finish_early_reply(early_req);
 388
 389        spin_lock(&req->rq_lock);
 390        olddl = req->rq_deadline;
 391        /*
 392         * server assumes it now has rq_timeout from when the request
 393         * arrived, so the client should give it at least that long.
 394         * since we don't know the arrival time we'll use the original
 395         * sent time
 396         */
 397        req->rq_deadline = req->rq_sent + req->rq_timeout +
 398                           ptlrpc_at_get_net_latency(req);
 399
 400        DEBUG_REQ(D_ADAPTTO, req,
 401                  "Early reply #%d, new deadline in %lds (%lds)",
 402                  req->rq_early_count,
 403                  (long)(req->rq_deadline - ktime_get_real_seconds()),
 404                  (long)(req->rq_deadline - olddl));
 405
 406        return rc;
 407}
 408
 409static struct kmem_cache *request_cache;
 410
 411int ptlrpc_request_cache_init(void)
 412{
 413        request_cache = kmem_cache_create("ptlrpc_cache",
 414                                          sizeof(struct ptlrpc_request),
 415                                          0, SLAB_HWCACHE_ALIGN, NULL);
 416        return !request_cache ? -ENOMEM : 0;
 417}
 418
 419void ptlrpc_request_cache_fini(void)
 420{
 421        kmem_cache_destroy(request_cache);
 422}
 423
 424struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
 425{
 426        struct ptlrpc_request *req;
 427
 428        req = kmem_cache_zalloc(request_cache, flags);
 429        return req;
 430}
 431
 432void ptlrpc_request_cache_free(struct ptlrpc_request *req)
 433{
 434        kmem_cache_free(request_cache, req);
 435}
 436
 437/**
 438 * Wind down request pool \a pool.
 439 * Frees all requests from the pool too
 440 */
 441void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 442{
 443        struct list_head *l, *tmp;
 444        struct ptlrpc_request *req;
 445
 446        spin_lock(&pool->prp_lock);
 447        list_for_each_safe(l, tmp, &pool->prp_req_list) {
 448                req = list_entry(l, struct ptlrpc_request, rq_list);
 449                list_del(&req->rq_list);
 450                LASSERT(req->rq_reqbuf);
 451                LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
 452                kvfree(req->rq_reqbuf);
 453                ptlrpc_request_cache_free(req);
 454        }
 455        spin_unlock(&pool->prp_lock);
 456        kfree(pool);
 457}
 458EXPORT_SYMBOL(ptlrpc_free_rq_pool);
 459
 460/**
 461 * Allocates, initializes and adds \a num_rq requests to the pool \a pool
 462 */
 463int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
 464{
 465        int i;
 466        int size = 1;
 467
 468        while (size < pool->prp_rq_size)
 469                size <<= 1;
 470
 471        LASSERTF(list_empty(&pool->prp_req_list) ||
 472                 size == pool->prp_rq_size,
 473                 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
 474                 pool->prp_rq_size, size);
 475
 476        spin_lock(&pool->prp_lock);
 477        pool->prp_rq_size = size;
 478        for (i = 0; i < num_rq; i++) {
 479                struct ptlrpc_request *req;
 480                struct lustre_msg *msg;
 481
 482                spin_unlock(&pool->prp_lock);
 483                req = ptlrpc_request_cache_alloc(GFP_NOFS);
 484                if (!req)
 485                        return i;
 486                msg = libcfs_kvzalloc(size, GFP_NOFS);
 487                if (!msg) {
 488                        ptlrpc_request_cache_free(req);
 489                        return i;
 490                }
 491                req->rq_reqbuf = msg;
 492                req->rq_reqbuf_len = size;
 493                req->rq_pool = pool;
 494                spin_lock(&pool->prp_lock);
 495                list_add_tail(&req->rq_list, &pool->prp_req_list);
 496        }
 497        spin_unlock(&pool->prp_lock);
 498        return num_rq;
 499}
 500EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
 501
 502/**
 503 * Create and initialize new request pool with given attributes:
 504 * \a num_rq - initial number of requests to create for the pool
 505 * \a msgsize - maximum message size possible for requests in thid pool
 506 * \a populate_pool - function to be called when more requests need to be added
 507 *                  to the pool
 508 * Returns pointer to newly created pool or NULL on error.
 509 */
 510struct ptlrpc_request_pool *
 511ptlrpc_init_rq_pool(int num_rq, int msgsize,
 512                    int (*populate_pool)(struct ptlrpc_request_pool *, int))
 513{
 514        struct ptlrpc_request_pool *pool;
 515
 516        pool = kzalloc(sizeof(struct ptlrpc_request_pool), GFP_NOFS);
 517        if (!pool)
 518                return NULL;
 519
 520        /*
 521         * Request next power of two for the allocation, because internally
 522         * kernel would do exactly this
 523         */
 524
 525        spin_lock_init(&pool->prp_lock);
 526        INIT_LIST_HEAD(&pool->prp_req_list);
 527        pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
 528        pool->prp_populate = populate_pool;
 529
 530        populate_pool(pool, num_rq);
 531
 532        return pool;
 533}
 534EXPORT_SYMBOL(ptlrpc_init_rq_pool);
 535
 536/**
 537 * Fetches one request from pool \a pool
 538 */
 539static struct ptlrpc_request *
 540ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
 541{
 542        struct ptlrpc_request *request;
 543        struct lustre_msg *reqbuf;
 544
 545        if (!pool)
 546                return NULL;
 547
 548        spin_lock(&pool->prp_lock);
 549
 550        /*
 551         * See if we have anything in a pool, and bail out if nothing,
 552         * in writeout path, where this matters, this is safe to do, because
 553         * nothing is lost in this case, and when some in-flight requests
 554         * complete, this code will be called again.
 555         */
 556        if (unlikely(list_empty(&pool->prp_req_list))) {
 557                spin_unlock(&pool->prp_lock);
 558                return NULL;
 559        }
 560
 561        request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
 562                             rq_list);
 563        list_del_init(&request->rq_list);
 564        spin_unlock(&pool->prp_lock);
 565
 566        LASSERT(request->rq_reqbuf);
 567        LASSERT(request->rq_pool);
 568
 569        reqbuf = request->rq_reqbuf;
 570        memset(request, 0, sizeof(*request));
 571        request->rq_reqbuf = reqbuf;
 572        request->rq_reqbuf_len = pool->prp_rq_size;
 573        request->rq_pool = pool;
 574
 575        return request;
 576}
 577
 578/**
 579 * Returns freed \a request to pool.
 580 */
 581static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
 582{
 583        struct ptlrpc_request_pool *pool = request->rq_pool;
 584
 585        spin_lock(&pool->prp_lock);
 586        LASSERT(list_empty(&request->rq_list));
 587        LASSERT(!request->rq_receiving_reply);
 588        list_add_tail(&request->rq_list, &pool->prp_req_list);
 589        spin_unlock(&pool->prp_lock);
 590}
 591
 592int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
 593                             __u32 version, int opcode, char **bufs,
 594                             struct ptlrpc_cli_ctx *ctx)
 595{
 596        int count;
 597        struct obd_import *imp;
 598        __u32 *lengths;
 599        int rc;
 600
 601        count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
 602        imp = request->rq_import;
 603        lengths = request->rq_pill.rc_area[RCL_CLIENT];
 604
 605        if (unlikely(ctx)) {
 606                request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
 607        } else {
 608                rc = sptlrpc_req_get_ctx(request);
 609                if (rc)
 610                        goto out_free;
 611        }
 612        sptlrpc_req_set_flavor(request, opcode);
 613
 614        rc = lustre_pack_request(request, imp->imp_msg_magic, count,
 615                                 lengths, bufs);
 616        if (rc)
 617                goto out_ctx;
 618
 619        lustre_msg_add_version(request->rq_reqmsg, version);
 620        request->rq_send_state = LUSTRE_IMP_FULL;
 621        request->rq_type = PTL_RPC_MSG_REQUEST;
 622
 623        request->rq_req_cbid.cbid_fn = request_out_callback;
 624        request->rq_req_cbid.cbid_arg = request;
 625
 626        request->rq_reply_cbid.cbid_fn = reply_in_callback;
 627        request->rq_reply_cbid.cbid_arg = request;
 628
 629        request->rq_reply_deadline = 0;
 630        request->rq_bulk_deadline = 0;
 631        request->rq_req_deadline = 0;
 632        request->rq_phase = RQ_PHASE_NEW;
 633        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 634
 635        request->rq_request_portal = imp->imp_client->cli_request_portal;
 636        request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 637
 638        ptlrpc_at_set_req_timeout(request);
 639
 640        request->rq_xid = ptlrpc_next_xid();
 641        lustre_msg_set_opc(request->rq_reqmsg, opcode);
 642
 643        /* Let's setup deadline for req/reply/bulk unlink for opcode. */
 644        if (cfs_fail_val == opcode) {
 645                time_t *fail_t = NULL, *fail2_t = NULL;
 646
 647                if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
 648                        fail_t = &request->rq_bulk_deadline;
 649                } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
 650                        fail_t = &request->rq_reply_deadline;
 651                } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) {
 652                        fail_t = &request->rq_req_deadline;
 653                } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
 654                        fail_t = &request->rq_reply_deadline;
 655                        fail2_t = &request->rq_bulk_deadline;
 656                }
 657
 658                if (fail_t) {
 659                        *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
 660
 661                        if (fail2_t)
 662                                *fail2_t = ktime_get_real_seconds() +
 663                                                 LONG_UNLINK;
 664
 665                        /* The RPC is infected, let the test change the
 666                         * fail_loc
 667                         */
 668                        set_current_state(TASK_UNINTERRUPTIBLE);
 669                        schedule_timeout(cfs_time_seconds(2));
 670                        set_current_state(TASK_RUNNING);
 671                }
 672        }
 673
 674        return 0;
 675
 676out_ctx:
 677        LASSERT(!request->rq_pool);
 678        sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
 679out_free:
 680        class_import_put(imp);
 681        return rc;
 682}
 683EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
 684
 685/**
 686 * Pack request buffers for network transfer, performing necessary encryption
 687 * steps if necessary.
 688 */
 689int ptlrpc_request_pack(struct ptlrpc_request *request,
 690                        __u32 version, int opcode)
 691{
 692        int rc;
 693
 694        rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
 695        if (rc)
 696                return rc;
 697
 698        /*
 699         * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
 700         * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
 701         * have to send old ptlrpc_body to keep interoperability with these
 702         * clients.
 703         *
 704         * Only three kinds of server->client RPCs so far:
 705         *  - LDLM_BL_CALLBACK
 706         *  - LDLM_CP_CALLBACK
 707         *  - LDLM_GL_CALLBACK
 708         *
 709         * XXX This should be removed whenever we drop the interoperability with
 710         *     the these old clients.
 711         */
 712        if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
 713            opcode == LDLM_GL_CALLBACK)
 714                req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
 715                                   sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
 716
 717        return rc;
 718}
 719EXPORT_SYMBOL(ptlrpc_request_pack);
 720
 721/**
 722 * Helper function to allocate new request on import \a imp
 723 * and possibly using existing request from pool \a pool if provided.
 724 * Returns allocated request structure with import field filled or
 725 * NULL on error.
 726 */
 727static inline
 728struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
 729                                              struct ptlrpc_request_pool *pool)
 730{
 731        struct ptlrpc_request *request;
 732
 733        request = ptlrpc_request_cache_alloc(GFP_NOFS);
 734
 735        if (!request && pool)
 736                request = ptlrpc_prep_req_from_pool(pool);
 737
 738        if (request) {
 739                ptlrpc_cli_req_init(request);
 740
 741                LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
 742                LASSERT(imp != LP_POISON);
 743                LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
 744                         imp->imp_client);
 745                LASSERT(imp->imp_client != LP_POISON);
 746
 747                request->rq_import = class_import_get(imp);
 748        } else {
 749                CERROR("request allocation out of memory\n");
 750        }
 751
 752        return request;
 753}
 754
 755/**
 756 * Helper function for creating a request.
 757 * Calls __ptlrpc_request_alloc to allocate new request structure and inits
 758 * buffer structures according to capsule template \a format.
 759 * Returns allocated request structure pointer or NULL on error.
 760 */
 761static struct ptlrpc_request *
 762ptlrpc_request_alloc_internal(struct obd_import *imp,
 763                              struct ptlrpc_request_pool *pool,
 764                              const struct req_format *format)
 765{
 766        struct ptlrpc_request *request;
 767
 768        request = __ptlrpc_request_alloc(imp, pool);
 769        if (!request)
 770                return NULL;
 771
 772        req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
 773        req_capsule_set(&request->rq_pill, format);
 774        return request;
 775}
 776
 777/**
 778 * Allocate new request structure for import \a imp and initialize its
 779 * buffer structure according to capsule template \a format.
 780 */
 781struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
 782                                            const struct req_format *format)
 783{
 784        return ptlrpc_request_alloc_internal(imp, NULL, format);
 785}
 786EXPORT_SYMBOL(ptlrpc_request_alloc);
 787
 788/**
 789 * Allocate new request structure for import \a imp from pool \a pool and
 790 * initialize its buffer structure according to capsule template \a format.
 791 */
 792struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
 793                                                 struct ptlrpc_request_pool *pool,
 794                                                 const struct req_format *format)
 795{
 796        return ptlrpc_request_alloc_internal(imp, pool, format);
 797}
 798EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
 799
 800/**
 801 * For requests not from pool, free memory of the request structure.
 802 * For requests obtained from a pool earlier, return request back to pool.
 803 */
 804void ptlrpc_request_free(struct ptlrpc_request *request)
 805{
 806        if (request->rq_pool)
 807                __ptlrpc_free_req_to_pool(request);
 808        else
 809                ptlrpc_request_cache_free(request);
 810}
 811EXPORT_SYMBOL(ptlrpc_request_free);
 812
 813/**
 814 * Allocate new request for operation \a opcode and immediately pack it for
 815 * network transfer.
 816 * Only used for simple requests like OBD_PING where the only important
 817 * part of the request is operation itself.
 818 * Returns allocated request or NULL on error.
 819 */
 820struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
 821                                                 const struct req_format *format,
 822                                                 __u32 version, int opcode)
 823{
 824        struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
 825        int rc;
 826
 827        if (req) {
 828                rc = ptlrpc_request_pack(req, version, opcode);
 829                if (rc) {
 830                        ptlrpc_request_free(req);
 831                        req = NULL;
 832                }
 833        }
 834        return req;
 835}
 836EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
 837
 838/**
 839 * Allocate and initialize new request set structure on the current CPT.
 840 * Returns a pointer to the newly allocated set structure or NULL on error.
 841 */
 842struct ptlrpc_request_set *ptlrpc_prep_set(void)
 843{
 844        struct ptlrpc_request_set *set;
 845        int cpt;
 846
 847        cpt = cfs_cpt_current(cfs_cpt_table, 0);
 848        set = kzalloc_node(sizeof(*set), GFP_NOFS,
 849                           cfs_cpt_spread_node(cfs_cpt_table, cpt));
 850        if (!set)
 851                return NULL;
 852        atomic_set(&set->set_refcount, 1);
 853        INIT_LIST_HEAD(&set->set_requests);
 854        init_waitqueue_head(&set->set_waitq);
 855        atomic_set(&set->set_new_count, 0);
 856        atomic_set(&set->set_remaining, 0);
 857        spin_lock_init(&set->set_new_req_lock);
 858        INIT_LIST_HEAD(&set->set_new_requests);
 859        INIT_LIST_HEAD(&set->set_cblist);
 860        set->set_max_inflight = UINT_MAX;
 861        set->set_producer = NULL;
 862        set->set_producer_arg = NULL;
 863        set->set_rc = 0;
 864
 865        return set;
 866}
 867EXPORT_SYMBOL(ptlrpc_prep_set);
 868
 869/**
 870 * Allocate and initialize new request set structure with flow control
 871 * extension. This extension allows to control the number of requests in-flight
 872 * for the whole set. A callback function to generate requests must be provided
 873 * and the request set will keep the number of requests sent over the wire to
 874 * @max_inflight.
 875 * Returns a pointer to the newly allocated set structure or NULL on error.
 876 */
 877struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
 878                                             void *arg)
 879
 880{
 881        struct ptlrpc_request_set *set;
 882
 883        set = ptlrpc_prep_set();
 884        if (!set)
 885                return NULL;
 886
 887        set->set_max_inflight = max;
 888        set->set_producer = func;
 889        set->set_producer_arg = arg;
 890
 891        return set;
 892}
 893
 894/**
 895 * Wind down and free request set structure previously allocated with
 896 * ptlrpc_prep_set.
 897 * Ensures that all requests on the set have completed and removes
 898 * all requests from the request list in a set.
 899 * If any unsent request happen to be on the list, pretends that they got
 900 * an error in flight and calls their completion handler.
 901 */
 902void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 903{
 904        struct list_head *tmp;
 905        struct list_head *next;
 906        int expected_phase;
 907        int n = 0;
 908
 909        /* Requests on the set should either all be completed, or all be new */
 910        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
 911                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
 912        list_for_each(tmp, &set->set_requests) {
 913                struct ptlrpc_request *req =
 914                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 915
 916                LASSERT(req->rq_phase == expected_phase);
 917                n++;
 918        }
 919
 920        LASSERTF(atomic_read(&set->set_remaining) == 0 ||
 921                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
 922                 atomic_read(&set->set_remaining), n);
 923
 924        list_for_each_safe(tmp, next, &set->set_requests) {
 925                struct ptlrpc_request *req =
 926                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 927                list_del_init(&req->rq_set_chain);
 928
 929                LASSERT(req->rq_phase == expected_phase);
 930
 931                if (req->rq_phase == RQ_PHASE_NEW) {
 932                        ptlrpc_req_interpret(NULL, req, -EBADR);
 933                        atomic_dec(&set->set_remaining);
 934                }
 935
 936                spin_lock(&req->rq_lock);
 937                req->rq_set = NULL;
 938                req->rq_invalid_rqset = 0;
 939                spin_unlock(&req->rq_lock);
 940
 941                ptlrpc_req_finished(req);
 942        }
 943
 944        LASSERT(atomic_read(&set->set_remaining) == 0);
 945
 946        ptlrpc_reqset_put(set);
 947}
 948EXPORT_SYMBOL(ptlrpc_set_destroy);
 949
 950/**
 951 * Add a new request to the general purpose request set.
 952 * Assumes request reference from the caller.
 953 */
 954void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
 955                        struct ptlrpc_request *req)
 956{
 957        LASSERT(list_empty(&req->rq_set_chain));
 958
 959        /* The set takes over the caller's request reference */
 960        list_add_tail(&req->rq_set_chain, &set->set_requests);
 961        req->rq_set = set;
 962        atomic_inc(&set->set_remaining);
 963        req->rq_queued_time = cfs_time_current();
 964
 965        if (req->rq_reqmsg)
 966                lustre_msg_set_jobid(req->rq_reqmsg, NULL);
 967
 968        if (set->set_producer)
 969                /*
 970                 * If the request set has a producer callback, the RPC must be
 971                 * sent straight away
 972                 */
 973                ptlrpc_send_new_req(req);
 974}
 975EXPORT_SYMBOL(ptlrpc_set_add_req);
 976
 977/**
 978 * Add a request to a request with dedicated server thread
 979 * and wake the thread to make any necessary processing.
 980 * Currently only used for ptlrpcd.
 981 */
 982void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
 983                            struct ptlrpc_request *req)
 984{
 985        struct ptlrpc_request_set *set = pc->pc_set;
 986        int count, i;
 987
 988        LASSERT(!req->rq_set);
 989        LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
 990
 991        spin_lock(&set->set_new_req_lock);
 992        /* The set takes over the caller's request reference.  */
 993        req->rq_set = set;
 994        req->rq_queued_time = cfs_time_current();
 995        list_add_tail(&req->rq_set_chain, &set->set_new_requests);
 996        count = atomic_inc_return(&set->set_new_count);
 997        spin_unlock(&set->set_new_req_lock);
 998
 999        /* Only need to call wakeup once for the first entry. */
1000        if (count == 1) {
1001                wake_up(&set->set_waitq);
1002
1003                /*
1004                 * XXX: It maybe unnecessary to wakeup all the partners. But to
1005                 *      guarantee the async RPC can be processed ASAP, we have
1006                 *      no other better choice. It maybe fixed in future.
1007                 */
1008                for (i = 0; i < pc->pc_npartners; i++)
1009                        wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
1010        }
1011}
1012
1013/**
1014 * Based on the current state of the import, determine if the request
1015 * can be sent, is an error, or should be delayed.
1016 *
1017 * Returns true if this request should be delayed. If false, and
1018 * *status is set, then the request can not be sent and *status is the
1019 * error code.  If false and status is 0, then request can be sent.
1020 *
1021 * The imp->imp_lock must be held.
1022 */
1023static int ptlrpc_import_delay_req(struct obd_import *imp,
1024                                   struct ptlrpc_request *req, int *status)
1025{
1026        int delay = 0;
1027
1028        *status = 0;
1029
1030        if (req->rq_ctx_init || req->rq_ctx_fini) {
1031                /* always allow ctx init/fini rpc go through */
1032        } else if (imp->imp_state == LUSTRE_IMP_NEW) {
1033                DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
1034                *status = -EIO;
1035        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1036                /* pings may safely race with umount */
1037                DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
1038                          D_HA : D_ERROR, req, "IMP_CLOSED ");
1039                *status = -EIO;
1040        } else if (ptlrpc_send_limit_expired(req)) {
1041                /* probably doesn't need to be a D_ERROR after initial testing */
1042                DEBUG_REQ(D_HA, req, "send limit expired ");
1043                *status = -ETIMEDOUT;
1044        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1045                   imp->imp_state == LUSTRE_IMP_CONNECTING) {
1046                /* allow CONNECT even if import is invalid */
1047                if (atomic_read(&imp->imp_inval_count) != 0) {
1048                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1049                        *status = -EIO;
1050                }
1051        } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1052                if (!imp->imp_deactive)
1053                        DEBUG_REQ(D_NET, req, "IMP_INVALID");
1054                *status = -ESHUTDOWN; /* bz 12940 */
1055        } else if (req->rq_import_generation != imp->imp_generation) {
1056                DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1057                *status = -EIO;
1058        } else if (req->rq_send_state != imp->imp_state) {
1059                /* invalidate in progress - any requests should be drop */
1060                if (atomic_read(&imp->imp_inval_count) != 0) {
1061                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1062                        *status = -EIO;
1063                } else if (imp->imp_dlm_fake || req->rq_no_delay) {
1064                        *status = -EWOULDBLOCK;
1065                } else if (req->rq_allow_replay &&
1066                          (imp->imp_state == LUSTRE_IMP_REPLAY ||
1067                           imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1068                           imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1069                           imp->imp_state == LUSTRE_IMP_RECOVER)) {
1070                        DEBUG_REQ(D_HA, req, "allow during recovery.\n");
1071                } else {
1072                        delay = 1;
1073                }
1074        }
1075
1076        return delay;
1077}
1078
1079/**
1080 * Decide if the error message should be printed to the console or not.
1081 * Makes its decision based on request type, status, and failure frequency.
1082 *
1083 * \param[in] req  request that failed and may need a console message
1084 *
1085 * \retval false if no message should be printed
1086 * \retval true  if console message should be printed
1087 */
1088static bool ptlrpc_console_allow(struct ptlrpc_request *req)
1089{
1090        __u32 opc;
1091
1092        LASSERT(req->rq_reqmsg);
1093        opc = lustre_msg_get_opc(req->rq_reqmsg);
1094
1095        /* Suppress particular reconnect errors which are to be expected. */
1096        if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
1097                int err;
1098
1099                /* Suppress timed out reconnect requests */
1100                if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
1101                    req->rq_timedout)
1102                        return false;
1103
1104                /*
1105                 * Suppress most unavailable/again reconnect requests, but
1106                 * print occasionally so it is clear client is trying to
1107                 * connect to a server where no target is running.
1108                 */
1109                err = lustre_msg_get_status(req->rq_repmsg);
1110                if ((err == -ENODEV || err == -EAGAIN) &&
1111                    req->rq_import->imp_conn_cnt % 30 != 20)
1112                        return false;
1113        }
1114
1115        return true;
1116}
1117
1118/**
1119 * Check request processing status.
1120 * Returns the status.
1121 */
1122static int ptlrpc_check_status(struct ptlrpc_request *req)
1123{
1124        int err;
1125
1126        err = lustre_msg_get_status(req->rq_repmsg);
1127        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1128                struct obd_import *imp = req->rq_import;
1129                lnet_nid_t nid = imp->imp_connection->c_peer.nid;
1130                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1131
1132                if (ptlrpc_console_allow(req))
1133                        LCONSOLE_ERROR_MSG(0x011, "%s: operation %s to node %s failed: rc = %d\n",
1134                                           imp->imp_obd->obd_name,
1135                                           ll_opcode2str(opc),
1136                                           libcfs_nid2str(nid), err);
1137                return err < 0 ? err : -EINVAL;
1138        }
1139
1140        if (err < 0)
1141                DEBUG_REQ(D_INFO, req, "status is %d", err);
1142        else if (err > 0)
1143                /* XXX: translate this error from net to host */
1144                DEBUG_REQ(D_INFO, req, "status is %d", err);
1145
1146        return err;
1147}
1148
1149/**
1150 * save pre-versions of objects into request for replay.
1151 * Versions are obtained from server reply.
1152 * used for VBR.
1153 */
1154static void ptlrpc_save_versions(struct ptlrpc_request *req)
1155{
1156        struct lustre_msg *repmsg = req->rq_repmsg;
1157        struct lustre_msg *reqmsg = req->rq_reqmsg;
1158        __u64 *versions = lustre_msg_get_versions(repmsg);
1159
1160        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1161                return;
1162
1163        LASSERT(versions);
1164        lustre_msg_set_versions(reqmsg, versions);
1165        CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1166               versions[0], versions[1]);
1167}
1168
1169/**
1170 * Callback function called when client receives RPC reply for \a req.
1171 * Returns 0 on success or error code.
1172 * The return value would be assigned to req->rq_status by the caller
1173 * as request processing status.
1174 * This function also decides if the request needs to be saved for later replay.
1175 */
1176static int after_reply(struct ptlrpc_request *req)
1177{
1178        struct obd_import *imp = req->rq_import;
1179        struct obd_device *obd = req->rq_import->imp_obd;
1180        int rc;
1181        struct timespec64 work_start;
1182        long timediff;
1183
1184        LASSERT(obd);
1185        /* repbuf must be unlinked */
1186        LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked);
1187
1188        if (req->rq_reply_truncated) {
1189                if (ptlrpc_no_resend(req)) {
1190                        DEBUG_REQ(D_ERROR, req, "reply buffer overflow, expected: %d, actual size: %d",
1191                                  req->rq_nob_received, req->rq_repbuf_len);
1192                        return -EOVERFLOW;
1193                }
1194
1195                sptlrpc_cli_free_repbuf(req);
1196                /*
1197                 * Pass the required reply buffer size (include space for early
1198                 * reply).  NB: no need to round up because alloc_repbuf will
1199                 * round it up
1200                 */
1201                req->rq_replen       = req->rq_nob_received;
1202                req->rq_nob_received = 0;
1203                spin_lock(&req->rq_lock);
1204                req->rq_resend       = 1;
1205                spin_unlock(&req->rq_lock);
1206                return 0;
1207        }
1208
1209        /*
1210         * NB Until this point, the whole of the incoming message,
1211         * including buflens, status etc is in the sender's byte order.
1212         */
1213        rc = sptlrpc_cli_unwrap_reply(req);
1214        if (rc) {
1215                DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
1216                return rc;
1217        }
1218
1219        /* Security layer unwrap might ask resend this request. */
1220        if (req->rq_resend)
1221                return 0;
1222
1223        rc = unpack_reply(req);
1224        if (rc)
1225                return rc;
1226
1227        /* retry indefinitely on EINPROGRESS */
1228        if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1229            ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1230                time64_t now = ktime_get_real_seconds();
1231
1232                DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
1233                spin_lock(&req->rq_lock);
1234                req->rq_resend = 1;
1235                spin_unlock(&req->rq_lock);
1236                req->rq_nr_resend++;
1237
1238                /* allocate new xid to avoid reply reconstruction */
1239                if (!req->rq_bulk) {
1240                        /* new xid is already allocated for bulk in ptlrpc_check_set() */
1241                        req->rq_xid = ptlrpc_next_xid();
1242                        DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
1243                }
1244
1245                /* Readjust the timeout for current conditions */
1246                ptlrpc_at_set_req_timeout(req);
1247                /*
1248                 * delay resend to give a chance to the server to get ready.
1249                 * The delay is increased by 1s on every resend and is capped to
1250                 * the current request timeout (i.e. obd_timeout if AT is off,
1251                 * or AT service time x 125% + 5s, see at_est2timeout)
1252                 */
1253                if (req->rq_nr_resend > req->rq_timeout)
1254                        req->rq_sent = now + req->rq_timeout;
1255                else
1256                        req->rq_sent = now + req->rq_nr_resend;
1257
1258                return 0;
1259        }
1260
1261        ktime_get_real_ts64(&work_start);
1262        timediff = (work_start.tv_sec - req->rq_sent_tv.tv_sec) * USEC_PER_SEC +
1263                   (work_start.tv_nsec - req->rq_sent_tv.tv_nsec) /
1264                                                                 NSEC_PER_USEC;
1265        if (obd->obd_svc_stats) {
1266                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1267                                    timediff);
1268                ptlrpc_lprocfs_rpc_sent(req, timediff);
1269        }
1270
1271        if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1272            lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1273                DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1274                          lustre_msg_get_type(req->rq_repmsg));
1275                return -EPROTO;
1276        }
1277
1278        if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1279                CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1280        ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1281        ptlrpc_at_adj_net_latency(req,
1282                                  lustre_msg_get_service_time(req->rq_repmsg));
1283
1284        rc = ptlrpc_check_status(req);
1285        imp->imp_connect_error = rc;
1286
1287        if (rc) {
1288                /*
1289                 * Either we've been evicted, or the server has failed for
1290                 * some reason. Try to reconnect, and if that fails, punt to
1291                 * the upcall.
1292                 */
1293                if (ptlrpc_recoverable_error(rc)) {
1294                        if (req->rq_send_state != LUSTRE_IMP_FULL ||
1295                            imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1296                                return rc;
1297                        }
1298                        ptlrpc_request_handle_notconn(req);
1299                        return rc;
1300                }
1301        } else {
1302                /*
1303                 * Let's look if server sent slv. Do it only for RPC with
1304                 * rc == 0.
1305                 */
1306                ldlm_cli_update_pool(req);
1307        }
1308
1309        /* Store transno in reqmsg for replay. */
1310        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1311                req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1312                lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1313        }
1314
1315        if (imp->imp_replayable) {
1316                spin_lock(&imp->imp_lock);
1317                /*
1318                 * No point in adding already-committed requests to the replay
1319                 * list, we will just remove them immediately. b=9829
1320                 */
1321                if (req->rq_transno != 0 &&
1322                    (req->rq_transno >
1323                     lustre_msg_get_last_committed(req->rq_repmsg) ||
1324                     req->rq_replay)) {
1325                        /* version recovery */
1326                        ptlrpc_save_versions(req);
1327                        ptlrpc_retain_replayable_request(req, imp);
1328                } else if (req->rq_commit_cb &&
1329                           list_empty(&req->rq_replay_list)) {
1330                        /*
1331                         * NB: don't call rq_commit_cb if it's already on
1332                         * rq_replay_list, ptlrpc_free_committed() will call
1333                         * it later, see LU-3618 for details
1334                         */
1335                        spin_unlock(&imp->imp_lock);
1336                        req->rq_commit_cb(req);
1337                        spin_lock(&imp->imp_lock);
1338                }
1339
1340                /* Replay-enabled imports return commit-status information. */
1341                if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1342                        imp->imp_peer_committed_transno =
1343                                lustre_msg_get_last_committed(req->rq_repmsg);
1344                }
1345
1346                ptlrpc_free_committed(imp);
1347
1348                if (!list_empty(&imp->imp_replay_list)) {
1349                        struct ptlrpc_request *last;
1350
1351                        last = list_entry(imp->imp_replay_list.prev,
1352                                          struct ptlrpc_request,
1353                                          rq_replay_list);
1354                        /*
1355                         * Requests with rq_replay stay on the list even if no
1356                         * commit is expected.
1357                         */
1358                        if (last->rq_transno > imp->imp_peer_committed_transno)
1359                                ptlrpc_pinger_commit_expected(imp);
1360                }
1361
1362                spin_unlock(&imp->imp_lock);
1363        }
1364
1365        return rc;
1366}
1367
1368/**
1369 * Helper function to send request \a req over the network for the first time
1370 * Also adjusts request phase.
1371 * Returns 0 on success or error code.
1372 */
1373static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1374{
1375        struct obd_import *imp = req->rq_import;
1376        int rc;
1377
1378        LASSERT(req->rq_phase == RQ_PHASE_NEW);
1379        if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
1380            (!req->rq_generation_set ||
1381             req->rq_import_generation == imp->imp_generation))
1382                return 0;
1383
1384        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1385
1386        spin_lock(&imp->imp_lock);
1387
1388        if (!req->rq_generation_set)
1389                req->rq_import_generation = imp->imp_generation;
1390
1391        if (ptlrpc_import_delay_req(imp, req, &rc)) {
1392                spin_lock(&req->rq_lock);
1393                req->rq_waiting = 1;
1394                spin_unlock(&req->rq_lock);
1395
1396                DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: (%s != %s)",
1397                          lustre_msg_get_status(req->rq_reqmsg),
1398                          ptlrpc_import_state_name(req->rq_send_state),
1399                          ptlrpc_import_state_name(imp->imp_state));
1400                LASSERT(list_empty(&req->rq_list));
1401                list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1402                atomic_inc(&req->rq_import->imp_inflight);
1403                spin_unlock(&imp->imp_lock);
1404                return 0;
1405        }
1406
1407        if (rc != 0) {
1408                spin_unlock(&imp->imp_lock);
1409                req->rq_status = rc;
1410                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1411                return rc;
1412        }
1413
1414        LASSERT(list_empty(&req->rq_list));
1415        list_add_tail(&req->rq_list, &imp->imp_sending_list);
1416        atomic_inc(&req->rq_import->imp_inflight);
1417        spin_unlock(&imp->imp_lock);
1418
1419        lustre_msg_set_status(req->rq_reqmsg, current_pid());
1420
1421        rc = sptlrpc_req_refresh_ctx(req, -1);
1422        if (rc) {
1423                if (req->rq_err) {
1424                        req->rq_status = rc;
1425                        return 1;
1426                }
1427                spin_lock(&req->rq_lock);
1428                req->rq_wait_ctx = 1;
1429                spin_unlock(&req->rq_lock);
1430                return 0;
1431        }
1432
1433        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1434               current_comm(),
1435               imp->imp_obd->obd_uuid.uuid,
1436               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1437               libcfs_nid2str(imp->imp_connection->c_peer.nid),
1438               lustre_msg_get_opc(req->rq_reqmsg));
1439
1440        rc = ptl_send_rpc(req, 0);
1441        if (rc) {
1442                DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1443                spin_lock(&req->rq_lock);
1444                req->rq_net_err = 1;
1445                spin_unlock(&req->rq_lock);
1446                return rc;
1447        }
1448        return 0;
1449}
1450
1451static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1452{
1453        int remaining, rc;
1454
1455        LASSERT(set->set_producer);
1456
1457        remaining = atomic_read(&set->set_remaining);
1458
1459        /*
1460         * populate the ->set_requests list with requests until we
1461         * reach the maximum number of RPCs in flight for this set
1462         */
1463        while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1464                rc = set->set_producer(set, set->set_producer_arg);
1465                if (rc == -ENOENT) {
1466                        /* no more RPC to produce */
1467                        set->set_producer     = NULL;
1468                        set->set_producer_arg = NULL;
1469                        return 0;
1470                }
1471        }
1472
1473        return (atomic_read(&set->set_remaining) - remaining);
1474}
1475
1476/**
1477 * this sends any unsent RPCs in \a set and returns 1 if all are sent
1478 * and no more replies are expected.
1479 * (it is possible to get less replies than requests sent e.g. due to timed out
1480 * requests or requests that we had trouble to send out)
1481 *
1482 * NOTE: This function contains a potential schedule point (cond_resched()).
1483 */
1484int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1485{
1486        struct list_head *tmp, *next;
1487        struct list_head comp_reqs;
1488        int force_timer_recalc = 0;
1489
1490        if (atomic_read(&set->set_remaining) == 0)
1491                return 1;
1492
1493        INIT_LIST_HEAD(&comp_reqs);
1494        list_for_each_safe(tmp, next, &set->set_requests) {
1495                struct ptlrpc_request *req =
1496                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1497                struct obd_import *imp = req->rq_import;
1498                int unregistered = 0;
1499                int rc = 0;
1500
1501                /*
1502                 * This schedule point is mainly for the ptlrpcd caller of this
1503                 * function.  Most ptlrpc sets are not long-lived and unbounded
1504                 * in length, but at the least the set used by the ptlrpcd is.
1505                 * Since the processing time is unbounded, we need to insert an
1506                 * explicit schedule point to make the thread well-behaved.
1507                 */
1508                cond_resched();
1509
1510                if (req->rq_phase == RQ_PHASE_NEW &&
1511                    ptlrpc_send_new_req(req)) {
1512                        force_timer_recalc = 1;
1513                }
1514
1515                /* delayed send - skip */
1516                if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1517                        continue;
1518
1519                /* delayed resend - skip */
1520                if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1521                    req->rq_sent > ktime_get_real_seconds())
1522                        continue;
1523
1524                if (!(req->rq_phase == RQ_PHASE_RPC ||
1525                      req->rq_phase == RQ_PHASE_BULK ||
1526                      req->rq_phase == RQ_PHASE_INTERPRET ||
1527                      req->rq_phase == RQ_PHASE_UNREG_RPC ||
1528                      req->rq_phase == RQ_PHASE_UNREG_BULK ||
1529                      req->rq_phase == RQ_PHASE_COMPLETE)) {
1530                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1531                        LBUG();
1532                }
1533
1534                if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
1535                    req->rq_phase == RQ_PHASE_UNREG_BULK) {
1536                        LASSERT(req->rq_next_phase != req->rq_phase);
1537                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1538
1539                        if (req->rq_req_deadline &&
1540                            !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
1541                                req->rq_req_deadline = 0;
1542                        if (req->rq_reply_deadline &&
1543                            !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
1544                                req->rq_reply_deadline = 0;
1545                        if (req->rq_bulk_deadline &&
1546                            !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
1547                                req->rq_bulk_deadline = 0;
1548
1549                        /*
1550                         * Skip processing until reply is unlinked. We
1551                         * can't return to pool before that and we can't
1552                         * call interpret before that. We need to make
1553                         * sure that all rdma transfers finished and will
1554                         * not corrupt any data.
1555                         */
1556                        if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
1557                            ptlrpc_client_recv_or_unlink(req))
1558                                continue;
1559                        if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
1560                            ptlrpc_client_bulk_active(req))
1561                                continue;
1562
1563                        /*
1564                         * Turn fail_loc off to prevent it from looping
1565                         * forever.
1566                         */
1567                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1568                                OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1569                                                     OBD_FAIL_ONCE);
1570                        }
1571                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1572                                OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1573                                                     OBD_FAIL_ONCE);
1574                        }
1575
1576                        /* Move to next phase if reply was successfully
1577                         * unlinked.
1578                         */
1579                        ptlrpc_rqphase_move(req, req->rq_next_phase);
1580                }
1581
1582                if (req->rq_phase == RQ_PHASE_COMPLETE) {
1583                        list_move_tail(&req->rq_set_chain, &comp_reqs);
1584                        continue;
1585                }
1586
1587                if (req->rq_phase == RQ_PHASE_INTERPRET)
1588                        goto interpret;
1589
1590                /* Note that this also will start async reply unlink. */
1591                if (req->rq_net_err && !req->rq_timedout) {
1592                        ptlrpc_expire_one_request(req, 1);
1593
1594                        /* Check if we still need to wait for unlink. */
1595                        if (ptlrpc_client_recv_or_unlink(req) ||
1596                            ptlrpc_client_bulk_active(req))
1597                                continue;
1598                        /* If there is no need to resend, fail it now. */
1599                        if (req->rq_no_resend) {
1600                                if (req->rq_status == 0)
1601                                        req->rq_status = -EIO;
1602                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1603                                goto interpret;
1604                        } else {
1605                                continue;
1606                        }
1607                }
1608
1609                if (req->rq_err) {
1610                        spin_lock(&req->rq_lock);
1611                        req->rq_replied = 0;
1612                        spin_unlock(&req->rq_lock);
1613                        if (req->rq_status == 0)
1614                                req->rq_status = -EIO;
1615                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1616                        goto interpret;
1617                }
1618
1619                /*
1620                 * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
1621                 * so it sets rq_intr regardless of individual rpc
1622                 * timeouts. The synchronous IO waiting path sets
1623                 * rq_intr irrespective of whether ptlrpcd
1624                 * has seen a timeout.  Our policy is to only interpret
1625                 * interrupted rpcs after they have timed out, so we
1626                 * need to enforce that here.
1627                 */
1628
1629                if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1630                                     req->rq_wait_ctx)) {
1631                        req->rq_status = -EINTR;
1632                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1633                        goto interpret;
1634                }
1635
1636                if (req->rq_phase == RQ_PHASE_RPC) {
1637                        if (req->rq_timedout || req->rq_resend ||
1638                            req->rq_waiting || req->rq_wait_ctx) {
1639                                int status;
1640
1641                                if (!ptlrpc_unregister_reply(req, 1)) {
1642                                        ptlrpc_unregister_bulk(req, 1);
1643                                        continue;
1644                                }
1645
1646                                spin_lock(&imp->imp_lock);
1647                                if (ptlrpc_import_delay_req(imp, req,
1648                                                            &status)) {
1649                                        /*
1650                                         * put on delay list - only if we wait
1651                                         * recovery finished - before send
1652                                         */
1653                                        list_del_init(&req->rq_list);
1654                                        list_add_tail(&req->rq_list,
1655                                                      &imp->imp_delayed_list);
1656                                        spin_unlock(&imp->imp_lock);
1657                                        continue;
1658                                }
1659
1660                                if (status != 0) {
1661                                        req->rq_status = status;
1662                                        ptlrpc_rqphase_move(req,
1663                                                            RQ_PHASE_INTERPRET);
1664                                        spin_unlock(&imp->imp_lock);
1665                                        goto interpret;
1666                                }
1667                                if (ptlrpc_no_resend(req) &&
1668                                    !req->rq_wait_ctx) {
1669                                        req->rq_status = -ENOTCONN;
1670                                        ptlrpc_rqphase_move(req,
1671                                                            RQ_PHASE_INTERPRET);
1672                                        spin_unlock(&imp->imp_lock);
1673                                        goto interpret;
1674                                }
1675
1676                                list_del_init(&req->rq_list);
1677                                list_add_tail(&req->rq_list,
1678                                              &imp->imp_sending_list);
1679
1680                                spin_unlock(&imp->imp_lock);
1681
1682                                spin_lock(&req->rq_lock);
1683                                req->rq_waiting = 0;
1684                                spin_unlock(&req->rq_lock);
1685
1686                                if (req->rq_timedout || req->rq_resend) {
1687                                        /* This is re-sending anyway, let's mark req as resend. */
1688                                        spin_lock(&req->rq_lock);
1689                                        req->rq_resend = 1;
1690                                        spin_unlock(&req->rq_lock);
1691                                        if (req->rq_bulk) {
1692                                                __u64 old_xid;
1693
1694                                                if (!ptlrpc_unregister_bulk(req, 1))
1695                                                        continue;
1696
1697                                                /* ensure previous bulk fails */
1698                                                old_xid = req->rq_xid;
1699                                                req->rq_xid = ptlrpc_next_xid();
1700                                                CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
1701                                                       old_xid, req->rq_xid);
1702                                        }
1703                                }
1704                                /*
1705                                 * rq_wait_ctx is only touched by ptlrpcd,
1706                                 * so no lock is needed here.
1707                                 */
1708                                status = sptlrpc_req_refresh_ctx(req, -1);
1709                                if (status) {
1710                                        if (req->rq_err) {
1711                                                req->rq_status = status;
1712                                                spin_lock(&req->rq_lock);
1713                                                req->rq_wait_ctx = 0;
1714                                                spin_unlock(&req->rq_lock);
1715                                                force_timer_recalc = 1;
1716                                        } else {
1717                                                spin_lock(&req->rq_lock);
1718                                                req->rq_wait_ctx = 1;
1719                                                spin_unlock(&req->rq_lock);
1720                                        }
1721
1722                                        continue;
1723                                } else {
1724                                        spin_lock(&req->rq_lock);
1725                                        req->rq_wait_ctx = 0;
1726                                        spin_unlock(&req->rq_lock);
1727                                }
1728
1729                                rc = ptl_send_rpc(req, 0);
1730                                if (rc) {
1731                                        DEBUG_REQ(D_HA, req,
1732                                                  "send failed: rc = %d", rc);
1733                                        force_timer_recalc = 1;
1734                                        spin_lock(&req->rq_lock);
1735                                        req->rq_net_err = 1;
1736                                        spin_unlock(&req->rq_lock);
1737                                        continue;
1738                                }
1739                                /* need to reset the timeout */
1740                                force_timer_recalc = 1;
1741                        }
1742
1743                        spin_lock(&req->rq_lock);
1744
1745                        if (ptlrpc_client_early(req)) {
1746                                ptlrpc_at_recv_early_reply(req);
1747                                spin_unlock(&req->rq_lock);
1748                                continue;
1749                        }
1750
1751                        /* Still waiting for a reply? */
1752                        if (ptlrpc_client_recv(req)) {
1753                                spin_unlock(&req->rq_lock);
1754                                continue;
1755                        }
1756
1757                        /* Did we actually receive a reply? */
1758                        if (!ptlrpc_client_replied(req)) {
1759                                spin_unlock(&req->rq_lock);
1760                                continue;
1761                        }
1762
1763                        spin_unlock(&req->rq_lock);
1764
1765                        /*
1766                         * unlink from net because we are going to
1767                         * swab in-place of reply buffer
1768                         */
1769                        unregistered = ptlrpc_unregister_reply(req, 1);
1770                        if (!unregistered)
1771                                continue;
1772
1773                        req->rq_status = after_reply(req);
1774                        if (req->rq_resend)
1775                                continue;
1776
1777                        /*
1778                         * If there is no bulk associated with this request,
1779                         * then we're done and should let the interpreter
1780                         * process the reply. Similarly if the RPC returned
1781                         * an error, and therefore the bulk will never arrive.
1782                         */
1783                        if (!req->rq_bulk || req->rq_status < 0) {
1784                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1785                                goto interpret;
1786                        }
1787
1788                        ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1789                }
1790
1791                LASSERT(req->rq_phase == RQ_PHASE_BULK);
1792                if (ptlrpc_client_bulk_active(req))
1793                        continue;
1794
1795                if (req->rq_bulk->bd_failure) {
1796                        /*
1797                         * The RPC reply arrived OK, but the bulk screwed
1798                         * up!  Dead weird since the server told us the RPC
1799                         * was good after getting the REPLY for her GET or
1800                         * the ACK for her PUT.
1801                         */
1802                        DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1803                        req->rq_status = -EIO;
1804                }
1805
1806                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1807
1808interpret:
1809                LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1810
1811                /*
1812                 * This moves to "unregistering" phase we need to wait for
1813                 * reply unlink.
1814                 */
1815                if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
1816                        /* start async bulk unlink too */
1817                        ptlrpc_unregister_bulk(req, 1);
1818                        continue;
1819                }
1820
1821                if (!ptlrpc_unregister_bulk(req, 1))
1822                        continue;
1823
1824                /* When calling interpret receive should already be finished. */
1825                LASSERT(!req->rq_receiving_reply);
1826
1827                ptlrpc_req_interpret(env, req, req->rq_status);
1828
1829                if (ptlrpcd_check_work(req)) {
1830                        atomic_dec(&set->set_remaining);
1831                        continue;
1832                }
1833                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1834
1835                CDEBUG(req->rq_reqmsg ? D_RPCTRACE : 0,
1836                       "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1837                       current_comm(), imp->imp_obd->obd_uuid.uuid,
1838                       lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1839                       libcfs_nid2str(imp->imp_connection->c_peer.nid),
1840                       lustre_msg_get_opc(req->rq_reqmsg));
1841
1842                spin_lock(&imp->imp_lock);
1843                /*
1844                 * Request already may be not on sending or delaying list. This
1845                 * may happen in the case of marking it erroneous for the case
1846                 * ptlrpc_import_delay_req(req, status) find it impossible to
1847                 * allow sending this rpc and returns *status != 0.
1848                 */
1849                if (!list_empty(&req->rq_list)) {
1850                        list_del_init(&req->rq_list);
1851                        atomic_dec(&imp->imp_inflight);
1852                }
1853                spin_unlock(&imp->imp_lock);
1854
1855                atomic_dec(&set->set_remaining);
1856                wake_up_all(&imp->imp_recovery_waitq);
1857
1858                if (set->set_producer) {
1859                        /* produce a new request if possible */
1860                        if (ptlrpc_set_producer(set) > 0)
1861                                force_timer_recalc = 1;
1862
1863                        /*
1864                         * free the request that has just been completed
1865                         * in order not to pollute set->set_requests
1866                         */
1867                        list_del_init(&req->rq_set_chain);
1868                        spin_lock(&req->rq_lock);
1869                        req->rq_set = NULL;
1870                        req->rq_invalid_rqset = 0;
1871                        spin_unlock(&req->rq_lock);
1872
1873                        /* record rq_status to compute the final status later */
1874                        if (req->rq_status != 0)
1875                                set->set_rc = req->rq_status;
1876                        ptlrpc_req_finished(req);
1877                } else {
1878                        list_move_tail(&req->rq_set_chain, &comp_reqs);
1879                }
1880        }
1881
1882        /*
1883         * move completed request at the head of list so it's easier for
1884         * caller to find them
1885         */
1886        list_splice(&comp_reqs, &set->set_requests);
1887
1888        /* If we hit an error, we want to recover promptly. */
1889        return atomic_read(&set->set_remaining) == 0 || force_timer_recalc;
1890}
1891EXPORT_SYMBOL(ptlrpc_check_set);
1892
1893/**
1894 * Time out request \a req. is \a async_unlink is set, that means do not wait
1895 * until LNet actually confirms network buffer unlinking.
1896 * Return 1 if we should give up further retrying attempts or 0 otherwise.
1897 */
1898int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1899{
1900        struct obd_import *imp = req->rq_import;
1901        int rc = 0;
1902
1903        spin_lock(&req->rq_lock);
1904        req->rq_timedout = 1;
1905        spin_unlock(&req->rq_lock);
1906
1907        DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
1908                  req->rq_net_err ? "failed due to network error" :
1909                     ((req->rq_real_sent == 0 ||
1910                       req->rq_real_sent < req->rq_sent ||
1911                       req->rq_real_sent >= req->rq_deadline) ?
1912                      "timed out for sent delay" : "timed out for slow reply"),
1913                  (s64)req->rq_sent, (s64)req->rq_real_sent);
1914
1915        if (imp && obd_debug_peer_on_timeout)
1916                LNetDebugPeer(imp->imp_connection->c_peer);
1917
1918        ptlrpc_unregister_reply(req, async_unlink);
1919        ptlrpc_unregister_bulk(req, async_unlink);
1920
1921        if (obd_dump_on_timeout)
1922                libcfs_debug_dumplog();
1923
1924        if (!imp) {
1925                DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1926                return 1;
1927        }
1928
1929        atomic_inc(&imp->imp_timeouts);
1930
1931        /* The DLM server doesn't want recovery run on its imports. */
1932        if (imp->imp_dlm_fake)
1933                return 1;
1934
1935        /*
1936         * If this request is for recovery or other primordial tasks,
1937         * then error it out here.
1938         */
1939        if (req->rq_ctx_init || req->rq_ctx_fini ||
1940            req->rq_send_state != LUSTRE_IMP_FULL ||
1941            imp->imp_obd->obd_no_recov) {
1942                DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1943                          ptlrpc_import_state_name(req->rq_send_state),
1944                          ptlrpc_import_state_name(imp->imp_state));
1945                spin_lock(&req->rq_lock);
1946                req->rq_status = -ETIMEDOUT;
1947                req->rq_err = 1;
1948                spin_unlock(&req->rq_lock);
1949                return 1;
1950        }
1951
1952        /*
1953         * if a request can't be resent we can't wait for an answer after
1954         * the timeout
1955         */
1956        if (ptlrpc_no_resend(req)) {
1957                DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1958                rc = 1;
1959        }
1960
1961        ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1962
1963        return rc;
1964}
1965
1966/**
1967 * Time out all uncompleted requests in request set pointed by \a data
1968 * Callback used when waiting on sets with l_wait_event.
1969 * Always returns 1.
1970 */
1971int ptlrpc_expired_set(void *data)
1972{
1973        struct ptlrpc_request_set *set = data;
1974        struct list_head *tmp;
1975        time64_t now = ktime_get_real_seconds();
1976
1977        /* A timeout expired. See which reqs it applies to...  */
1978        list_for_each(tmp, &set->set_requests) {
1979                struct ptlrpc_request *req =
1980                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1981
1982                /* don't expire request waiting for context */
1983                if (req->rq_wait_ctx)
1984                        continue;
1985
1986                /* Request in-flight? */
1987                if (!((req->rq_phase == RQ_PHASE_RPC &&
1988                       !req->rq_waiting && !req->rq_resend) ||
1989                      (req->rq_phase == RQ_PHASE_BULK)))
1990                        continue;
1991
1992                if (req->rq_timedout ||     /* already dealt with */
1993                    req->rq_deadline > now) /* not expired */
1994                        continue;
1995
1996                /*
1997                 * Deal with this guy. Do it asynchronously to not block
1998                 * ptlrpcd thread.
1999                 */
2000                ptlrpc_expire_one_request(req, 1);
2001        }
2002
2003        /*
2004         * When waiting for a whole set, we always break out of the
2005         * sleep so we can recalculate the timeout, or enable interrupts
2006         * if everyone's timed out.
2007         */
2008        return 1;
2009}
2010
2011/**
2012 * Sets rq_intr flag in \a req under spinlock.
2013 */
2014void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
2015{
2016        spin_lock(&req->rq_lock);
2017        req->rq_intr = 1;
2018        spin_unlock(&req->rq_lock);
2019}
2020EXPORT_SYMBOL(ptlrpc_mark_interrupted);
2021
2022/**
2023 * Interrupts (sets interrupted flag) all uncompleted requests in
2024 * a set \a data. Callback for l_wait_event for interruptible waits.
2025 */
2026static void ptlrpc_interrupted_set(void *data)
2027{
2028        struct ptlrpc_request_set *set = data;
2029        struct list_head *tmp;
2030
2031        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2032
2033        list_for_each(tmp, &set->set_requests) {
2034                struct ptlrpc_request *req =
2035                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2036
2037                if (req->rq_phase != RQ_PHASE_RPC &&
2038                    req->rq_phase != RQ_PHASE_UNREG_RPC)
2039                        continue;
2040
2041                ptlrpc_mark_interrupted(req);
2042        }
2043}
2044
2045/**
2046 * Get the smallest timeout in the set; this does NOT set a timeout.
2047 */
2048int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2049{
2050        struct list_head *tmp;
2051        time64_t now = ktime_get_real_seconds();
2052        int timeout = 0;
2053        struct ptlrpc_request *req;
2054        time64_t deadline;
2055
2056        list_for_each(tmp, &set->set_requests) {
2057                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2058
2059                /* Request in-flight? */
2060                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2061                      (req->rq_phase == RQ_PHASE_BULK) ||
2062                      (req->rq_phase == RQ_PHASE_NEW)))
2063                        continue;
2064
2065                /* Already timed out. */
2066                if (req->rq_timedout)
2067                        continue;
2068
2069                /* Waiting for ctx. */
2070                if (req->rq_wait_ctx)
2071                        continue;
2072
2073                if (req->rq_phase == RQ_PHASE_NEW)
2074                        deadline = req->rq_sent;
2075                else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2076                        deadline = req->rq_sent;
2077                else
2078                        deadline = req->rq_sent + req->rq_timeout;
2079
2080                if (deadline <= now)    /* actually expired already */
2081                        timeout = 1;    /* ASAP */
2082                else if (timeout == 0 || timeout > deadline - now)
2083                        timeout = deadline - now;
2084        }
2085        return timeout;
2086}
2087
2088/**
2089 * Send all unset request from the set and then wait until all
2090 * requests in the set complete (either get a reply, timeout, get an
2091 * error or otherwise be interrupted).
2092 * Returns 0 on success or error code otherwise.
2093 */
2094int ptlrpc_set_wait(struct ptlrpc_request_set *set)
2095{
2096        struct list_head *tmp;
2097        struct ptlrpc_request *req;
2098        struct l_wait_info lwi;
2099        int rc, timeout;
2100
2101        if (set->set_producer)
2102                (void)ptlrpc_set_producer(set);
2103        else
2104                list_for_each(tmp, &set->set_requests) {
2105                        req = list_entry(tmp, struct ptlrpc_request,
2106                                         rq_set_chain);
2107                        if (req->rq_phase == RQ_PHASE_NEW)
2108                                (void)ptlrpc_send_new_req(req);
2109                }
2110
2111        if (list_empty(&set->set_requests))
2112                return 0;
2113
2114        do {
2115                timeout = ptlrpc_set_next_timeout(set);
2116
2117                /*
2118                 * wait until all complete, interrupted, or an in-flight
2119                 * req times out
2120                 */
2121                CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
2122                       set, timeout);
2123
2124                if (timeout == 0 && !signal_pending(current))
2125                        /*
2126                         * No requests are in-flight (ether timed out
2127                         * or delayed), so we can allow interrupts.
2128                         * We still want to block for a limited time,
2129                         * so we allow interrupts during the timeout.
2130                         */
2131                        lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
2132                                                   ptlrpc_expired_set,
2133                                                   ptlrpc_interrupted_set, set);
2134                else
2135                        /*
2136                         * At least one request is in flight, so no
2137                         * interrupts are allowed. Wait until all
2138                         * complete, or an in-flight req times out.
2139                         */
2140                        lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2141                                          ptlrpc_expired_set, set);
2142
2143                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
2144
2145                /*
2146                 * LU-769 - if we ignored the signal because it was already
2147                 * pending when we started, we need to handle it now or we risk
2148                 * it being ignored forever
2149                 */
2150                if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
2151                    signal_pending(current)) {
2152                        sigset_t blocked_sigs =
2153                                           cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2154
2155                        /*
2156                         * In fact we only interrupt for the "fatal" signals
2157                         * like SIGINT or SIGKILL. We still ignore less
2158                         * important signals since ptlrpc set is not easily
2159                         * reentrant from userspace again
2160                         */
2161                        if (signal_pending(current))
2162                                ptlrpc_interrupted_set(set);
2163                        cfs_restore_sigs(blocked_sigs);
2164                }
2165
2166                LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2167
2168                /*
2169                 * -EINTR => all requests have been flagged rq_intr so next
2170                 * check completes.
2171                 * -ETIMEDOUT => someone timed out.  When all reqs have
2172                 * timed out, signals are enabled allowing completion with
2173                 * EINTR.
2174                 * I don't really care if we go once more round the loop in
2175                 * the error cases -eeb.
2176                 */
2177                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2178                        list_for_each(tmp, &set->set_requests) {
2179                                req = list_entry(tmp, struct ptlrpc_request,
2180                                                 rq_set_chain);
2181                                spin_lock(&req->rq_lock);
2182                                req->rq_invalid_rqset = 1;
2183                                spin_unlock(&req->rq_lock);
2184                        }
2185                }
2186        } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2187
2188        LASSERT(atomic_read(&set->set_remaining) == 0);
2189
2190        rc = set->set_rc; /* rq_status of already freed requests if any */
2191        list_for_each(tmp, &set->set_requests) {
2192                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2193
2194                LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2195                if (req->rq_status != 0)
2196                        rc = req->rq_status;
2197        }
2198
2199        if (set->set_interpret) {
2200                int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
2201                        set->set_interpret;
2202                rc = interpreter(set, set->set_arg, rc);
2203        } else {
2204                struct ptlrpc_set_cbdata *cbdata, *n;
2205                int err;
2206
2207                list_for_each_entry_safe(cbdata, n,
2208                                         &set->set_cblist, psc_item) {
2209                        list_del_init(&cbdata->psc_item);
2210                        err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
2211                        if (err && !rc)
2212                                rc = err;
2213                        kfree(cbdata);
2214                }
2215        }
2216
2217        return rc;
2218}
2219EXPORT_SYMBOL(ptlrpc_set_wait);
2220
2221/**
2222 * Helper function for request freeing.
2223 * Called when request count reached zero and request needs to be freed.
2224 * Removes request from all sorts of sending/replay lists it might be on,
2225 * frees network buffers if any are present.
2226 * If \a locked is set, that means caller is already holding import imp_lock
2227 * and so we no longer need to reobtain it (for certain lists manipulations)
2228 */
2229static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2230{
2231        if (!request)
2232                return;
2233        LASSERT(!request->rq_srv_req);
2234        LASSERT(!request->rq_export);
2235        LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2236        LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2237        LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2238        LASSERTF(!request->rq_replay, "req %p\n", request);
2239
2240        req_capsule_fini(&request->rq_pill);
2241
2242        /*
2243         * We must take it off the imp_replay_list first.  Otherwise, we'll set
2244         * request->rq_reqmsg to NULL while osc_close is dereferencing it.
2245         */
2246        if (request->rq_import) {
2247                if (!locked)
2248                        spin_lock(&request->rq_import->imp_lock);
2249                list_del_init(&request->rq_replay_list);
2250                if (!locked)
2251                        spin_unlock(&request->rq_import->imp_lock);
2252        }
2253        LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2254
2255        if (atomic_read(&request->rq_refcount) != 0) {
2256                DEBUG_REQ(D_ERROR, request,
2257                          "freeing request with nonzero refcount");
2258                LBUG();
2259        }
2260
2261        if (request->rq_repbuf)
2262                sptlrpc_cli_free_repbuf(request);
2263
2264        if (request->rq_import) {
2265                class_import_put(request->rq_import);
2266                request->rq_import = NULL;
2267        }
2268        if (request->rq_bulk)
2269                ptlrpc_free_bulk_pin(request->rq_bulk);
2270
2271        if (request->rq_reqbuf || request->rq_clrbuf)
2272                sptlrpc_cli_free_reqbuf(request);
2273
2274        if (request->rq_cli_ctx)
2275                sptlrpc_req_put_ctx(request, !locked);
2276
2277        if (request->rq_pool)
2278                __ptlrpc_free_req_to_pool(request);
2279        else
2280                ptlrpc_request_cache_free(request);
2281}
2282
2283/**
2284 * Helper function
2285 * Drops one reference count for request \a request.
2286 * \a locked set indicates that caller holds import imp_lock.
2287 * Frees the request when reference count reaches zero.
2288 */
2289static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2290{
2291        if (!request)
2292                return 1;
2293
2294        if (request == LP_POISON ||
2295            request->rq_reqmsg == LP_POISON) {
2296                CERROR("dereferencing freed request (bug 575)\n");
2297                LBUG();
2298                return 1;
2299        }
2300
2301        DEBUG_REQ(D_INFO, request, "refcount now %u",
2302                  atomic_read(&request->rq_refcount) - 1);
2303
2304        if (atomic_dec_and_test(&request->rq_refcount)) {
2305                __ptlrpc_free_req(request, locked);
2306                return 1;
2307        }
2308
2309        return 0;
2310}
2311
2312/**
2313 * Drops one reference count for a request.
2314 */
2315void ptlrpc_req_finished(struct ptlrpc_request *request)
2316{
2317        __ptlrpc_req_finished(request, 0);
2318}
2319EXPORT_SYMBOL(ptlrpc_req_finished);
2320
2321/**
2322 * Returns xid of a \a request
2323 */
2324__u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2325{
2326        return request->rq_xid;
2327}
2328EXPORT_SYMBOL(ptlrpc_req_xid);
2329
2330/**
2331 * Disengage the client's reply buffer from the network
2332 * NB does _NOT_ unregister any client-side bulk.
2333 * IDEMPOTENT, but _not_ safe against concurrent callers.
2334 * The request owner (i.e. the thread doing the I/O) must call...
2335 * Returns 0 on success or 1 if unregistering cannot be made.
2336 */
2337static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2338{
2339        int rc;
2340        wait_queue_head_t *wq;
2341        struct l_wait_info lwi;
2342
2343        /* Might sleep. */
2344        LASSERT(!in_interrupt());
2345
2346        /* Let's setup deadline for reply unlink. */
2347        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2348            async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
2349                request->rq_reply_deadline =
2350                        ktime_get_real_seconds() + LONG_UNLINK;
2351
2352        /* Nothing left to do. */
2353        if (!ptlrpc_client_recv_or_unlink(request))
2354                return 1;
2355
2356        LNetMDUnlink(request->rq_reply_md_h);
2357
2358        /* Let's check it once again. */
2359        if (!ptlrpc_client_recv_or_unlink(request))
2360                return 1;
2361
2362        /* Move to "Unregistering" phase as reply was not unlinked yet. */
2363        ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
2364
2365        /* Do not wait for unlink to finish. */
2366        if (async)
2367                return 0;
2368
2369        /*
2370         * We have to l_wait_event() whatever the result, to give liblustre
2371         * a chance to run reply_in_callback(), and to make sure we've
2372         * unlinked before returning a req to the pool.
2373         */
2374        if (request->rq_set)
2375                wq = &request->rq_set->set_waitq;
2376        else
2377                wq = &request->rq_reply_waitq;
2378
2379        for (;;) {
2380                /*
2381                 * Network access will complete in finite time but the HUGE
2382                 * timeout lets us CWARN for visibility of sluggish NALs
2383                 */
2384                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2385                                           cfs_time_seconds(1), NULL, NULL);
2386                rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2387                                  &lwi);
2388                if (rc == 0) {
2389                        ptlrpc_rqphase_move(request, request->rq_next_phase);
2390                        return 1;
2391                }
2392
2393                LASSERT(rc == -ETIMEDOUT);
2394                DEBUG_REQ(D_WARNING, request,
2395                          "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
2396                          request->rq_receiving_reply,
2397                          request->rq_req_unlinked,
2398                          request->rq_reply_unlinked);
2399        }
2400        return 0;
2401}
2402
2403static void ptlrpc_free_request(struct ptlrpc_request *req)
2404{
2405        spin_lock(&req->rq_lock);
2406        req->rq_replay = 0;
2407        spin_unlock(&req->rq_lock);
2408
2409        if (req->rq_commit_cb)
2410                req->rq_commit_cb(req);
2411        list_del_init(&req->rq_replay_list);
2412
2413        __ptlrpc_req_finished(req, 1);
2414}
2415
2416/**
2417 * the request is committed and dropped from the replay list of its import
2418 */
2419void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2420{
2421        struct obd_import       *imp = req->rq_import;
2422
2423        spin_lock(&imp->imp_lock);
2424        if (list_empty(&req->rq_replay_list)) {
2425                spin_unlock(&imp->imp_lock);
2426                return;
2427        }
2428
2429        if (force || req->rq_transno <= imp->imp_peer_committed_transno)
2430                ptlrpc_free_request(req);
2431
2432        spin_unlock(&imp->imp_lock);
2433}
2434EXPORT_SYMBOL(ptlrpc_request_committed);
2435
2436/**
2437 * Iterates through replay_list on import and prunes
2438 * all requests have transno smaller than last_committed for the
2439 * import and don't have rq_replay set.
2440 * Since requests are sorted in transno order, stops when meeting first
2441 * transno bigger than last_committed.
2442 * caller must hold imp->imp_lock
2443 */
2444void ptlrpc_free_committed(struct obd_import *imp)
2445{
2446        struct ptlrpc_request *req, *saved;
2447        struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
2448        bool skip_committed_list = true;
2449
2450        assert_spin_locked(&imp->imp_lock);
2451
2452        if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2453            imp->imp_generation == imp->imp_last_generation_checked) {
2454                CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2455                       imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2456                return;
2457        }
2458        CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2459               imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2460               imp->imp_generation);
2461
2462        if (imp->imp_generation != imp->imp_last_generation_checked ||
2463            !imp->imp_last_transno_checked)
2464                skip_committed_list = false;
2465
2466        imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2467        imp->imp_last_generation_checked = imp->imp_generation;
2468
2469        list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2470                                 rq_replay_list) {
2471                /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
2472                LASSERT(req != last_req);
2473                last_req = req;
2474
2475                if (req->rq_transno == 0) {
2476                        DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2477                        LBUG();
2478                }
2479                if (req->rq_import_generation < imp->imp_generation) {
2480                        DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2481                        goto free_req;
2482                }
2483
2484                /* not yet committed */
2485                if (req->rq_transno > imp->imp_peer_committed_transno) {
2486                        DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2487                        break;
2488                }
2489
2490                if (req->rq_replay) {
2491                        DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2492                        list_move_tail(&req->rq_replay_list,
2493                                       &imp->imp_committed_list);
2494                        continue;
2495                }
2496
2497                DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2498                          imp->imp_peer_committed_transno);
2499free_req:
2500                ptlrpc_free_request(req);
2501        }
2502        if (skip_committed_list)
2503                return;
2504
2505        list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2506                                 rq_replay_list) {
2507                LASSERT(req->rq_transno != 0);
2508                if (req->rq_import_generation < imp->imp_generation) {
2509                        DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
2510                        ptlrpc_free_request(req);
2511                } else if (!req->rq_replay) {
2512                        DEBUG_REQ(D_RPCTRACE, req, "free closed open request");
2513                        ptlrpc_free_request(req);
2514                }
2515        }
2516}
2517
2518/**
2519 * Schedule previously sent request for resend.
2520 * For bulk requests we assign new xid (to avoid problems with
2521 * lost replies and therefore several transfers landing into same buffer
2522 * from different sending attempts).
2523 */
2524void ptlrpc_resend_req(struct ptlrpc_request *req)
2525{
2526        DEBUG_REQ(D_HA, req, "going to resend");
2527        spin_lock(&req->rq_lock);
2528
2529        /*
2530         * Request got reply but linked to the import list still.
2531         * Let ptlrpc_check_set() to process it.
2532         */
2533        if (ptlrpc_client_replied(req)) {
2534                spin_unlock(&req->rq_lock);
2535                DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2536                return;
2537        }
2538
2539        lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
2540        req->rq_status = -EAGAIN;
2541
2542        req->rq_resend = 1;
2543        req->rq_net_err = 0;
2544        req->rq_timedout = 0;
2545        if (req->rq_bulk) {
2546                __u64 old_xid = req->rq_xid;
2547
2548                /* ensure previous bulk fails */
2549                req->rq_xid = ptlrpc_next_xid();
2550                CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
2551                       old_xid, req->rq_xid);
2552        }
2553        ptlrpc_client_wake_req(req);
2554        spin_unlock(&req->rq_lock);
2555}
2556
2557/**
2558 * Grab additional reference on a request \a req
2559 */
2560struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2561{
2562        atomic_inc(&req->rq_refcount);
2563        return req;
2564}
2565EXPORT_SYMBOL(ptlrpc_request_addref);
2566
2567/**
2568 * Add a request to import replay_list.
2569 * Must be called under imp_lock
2570 */
2571void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2572                                      struct obd_import *imp)
2573{
2574        struct list_head *tmp;
2575
2576        assert_spin_locked(&imp->imp_lock);
2577
2578        if (req->rq_transno == 0) {
2579                DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2580                LBUG();
2581        }
2582
2583        /*
2584         * clear this for new requests that were resent as well
2585         * as resent replayed requests.
2586         */
2587        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2588
2589        /* don't re-add requests that have been replayed */
2590        if (!list_empty(&req->rq_replay_list))
2591                return;
2592
2593        lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2594
2595        LASSERT(imp->imp_replayable);
2596        /* Balanced in ptlrpc_free_committed, usually. */
2597        ptlrpc_request_addref(req);
2598        list_for_each_prev(tmp, &imp->imp_replay_list) {
2599                struct ptlrpc_request *iter =
2600                        list_entry(tmp, struct ptlrpc_request, rq_replay_list);
2601
2602                /*
2603                 * We may have duplicate transnos if we create and then
2604                 * open a file, or for closes retained if to match creating
2605                 * opens, so use req->rq_xid as a secondary key.
2606                 * (See bugs 684, 685, and 428.)
2607                 * XXX no longer needed, but all opens need transnos!
2608                 */
2609                if (iter->rq_transno > req->rq_transno)
2610                        continue;
2611
2612                if (iter->rq_transno == req->rq_transno) {
2613                        LASSERT(iter->rq_xid != req->rq_xid);
2614                        if (iter->rq_xid > req->rq_xid)
2615                                continue;
2616                }
2617
2618                list_add(&req->rq_replay_list, &iter->rq_replay_list);
2619                return;
2620        }
2621
2622        list_add(&req->rq_replay_list, &imp->imp_replay_list);
2623}
2624
2625/**
2626 * Send request and wait until it completes.
2627 * Returns request processing status.
2628 */
2629int ptlrpc_queue_wait(struct ptlrpc_request *req)
2630{
2631        struct ptlrpc_request_set *set;
2632        int rc;
2633
2634        LASSERT(!req->rq_set);
2635        LASSERT(!req->rq_receiving_reply);
2636
2637        set = ptlrpc_prep_set();
2638        if (!set) {
2639                CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM);
2640                return -ENOMEM;
2641        }
2642
2643        /* for distributed debugging */
2644        lustre_msg_set_status(req->rq_reqmsg, current_pid());
2645
2646        /* add a ref for the set (see comment in ptlrpc_set_add_req) */
2647        ptlrpc_request_addref(req);
2648        ptlrpc_set_add_req(set, req);
2649        rc = ptlrpc_set_wait(set);
2650        ptlrpc_set_destroy(set);
2651
2652        return rc;
2653}
2654EXPORT_SYMBOL(ptlrpc_queue_wait);
2655
2656/**
2657 * Callback used for replayed requests reply processing.
2658 * In case of successful reply calls registered request replay callback.
2659 * In case of error restart replay process.
2660 */
2661static int ptlrpc_replay_interpret(const struct lu_env *env,
2662                                   struct ptlrpc_request *req,
2663                                   void *data, int rc)
2664{
2665        struct ptlrpc_replay_async_args *aa = data;
2666        struct obd_import *imp = req->rq_import;
2667
2668        atomic_dec(&imp->imp_replay_inflight);
2669
2670        if (!ptlrpc_client_replied(req)) {
2671                CERROR("request replay timed out, restarting recovery\n");
2672                rc = -ETIMEDOUT;
2673                goto out;
2674        }
2675
2676        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2677            (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2678             lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) {
2679                rc = lustre_msg_get_status(req->rq_repmsg);
2680                goto out;
2681        }
2682
2683        /** VBR: check version failure */
2684        if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2685                /** replay was failed due to version mismatch */
2686                DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2687                spin_lock(&imp->imp_lock);
2688                imp->imp_vbr_failed = 1;
2689                imp->imp_no_lock_replay = 1;
2690                spin_unlock(&imp->imp_lock);
2691                lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2692        } else {
2693                /** The transno had better not change over replay. */
2694                LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
2695                         lustre_msg_get_transno(req->rq_repmsg) ||
2696                         lustre_msg_get_transno(req->rq_repmsg) == 0,
2697                         "%#llx/%#llx\n",
2698                         lustre_msg_get_transno(req->rq_reqmsg),
2699                         lustre_msg_get_transno(req->rq_repmsg));
2700        }
2701
2702        spin_lock(&imp->imp_lock);
2703        /** if replays by version then gap occur on server, no trust to locks */
2704        if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
2705                imp->imp_no_lock_replay = 1;
2706        imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2707        spin_unlock(&imp->imp_lock);
2708        LASSERT(imp->imp_last_replay_transno);
2709
2710        /* transaction number shouldn't be bigger than the latest replayed */
2711        if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
2712                DEBUG_REQ(D_ERROR, req,
2713                          "Reported transno %llu is bigger than the replayed one: %llu",
2714                          req->rq_transno,
2715                          lustre_msg_get_transno(req->rq_reqmsg));
2716                rc = -EINVAL;
2717                goto out;
2718        }
2719
2720        DEBUG_REQ(D_HA, req, "got rep");
2721
2722        /* let the callback do fixups, possibly including in the request */
2723        if (req->rq_replay_cb)
2724                req->rq_replay_cb(req);
2725
2726        if (ptlrpc_client_replied(req) &&
2727            lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2728                DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2729                          lustre_msg_get_status(req->rq_repmsg),
2730                          aa->praa_old_status);
2731        } else {
2732                /* Put it back for re-replay. */
2733                lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2734        }
2735
2736        /*
2737         * Errors while replay can set transno to 0, but
2738         * imp_last_replay_transno shouldn't be set to 0 anyway
2739         */
2740        if (req->rq_transno == 0)
2741                CERROR("Transno is 0 during replay!\n");
2742
2743        /* continue with recovery */
2744        rc = ptlrpc_import_recovery_state_machine(imp);
2745 out:
2746        req->rq_send_state = aa->praa_old_state;
2747
2748        if (rc != 0)
2749                /* this replay failed, so restart recovery */
2750                ptlrpc_connect_import(imp);
2751
2752        return rc;
2753}
2754
2755/**
2756 * Prepares and queues request for replay.
2757 * Adds it to ptlrpcd queue for actual sending.
2758 * Returns 0 on success.
2759 */
2760int ptlrpc_replay_req(struct ptlrpc_request *req)
2761{
2762        struct ptlrpc_replay_async_args *aa;
2763
2764        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2765
2766        LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2767        aa = ptlrpc_req_async_args(req);
2768        memset(aa, 0, sizeof(*aa));
2769
2770        /* Prepare request to be resent with ptlrpcd */
2771        aa->praa_old_state = req->rq_send_state;
2772        req->rq_send_state = LUSTRE_IMP_REPLAY;
2773        req->rq_phase = RQ_PHASE_NEW;
2774        req->rq_next_phase = RQ_PHASE_UNDEFINED;
2775        if (req->rq_repmsg)
2776                aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2777        req->rq_status = 0;
2778        req->rq_interpret_reply = ptlrpc_replay_interpret;
2779        /* Readjust the timeout for current conditions */
2780        ptlrpc_at_set_req_timeout(req);
2781
2782        /*
2783         * Tell server the net_latency, so the server can calculate how long
2784         * it should wait for next replay
2785         */
2786        lustre_msg_set_service_time(req->rq_reqmsg,
2787                                    ptlrpc_at_get_net_latency(req));
2788        DEBUG_REQ(D_HA, req, "REPLAY");
2789
2790        atomic_inc(&req->rq_import->imp_replay_inflight);
2791        ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2792
2793        ptlrpcd_add_req(req);
2794        return 0;
2795}
2796
2797/**
2798 * Aborts all in-flight request on import \a imp sending and delayed lists
2799 */
2800void ptlrpc_abort_inflight(struct obd_import *imp)
2801{
2802        struct list_head *tmp, *n;
2803
2804        /*
2805         * Make sure that no new requests get processed for this import.
2806         * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2807         * this flag and then putting requests on sending_list or delayed_list.
2808         */
2809        spin_lock(&imp->imp_lock);
2810
2811        /*
2812         * XXX locking?  Maybe we should remove each request with the list
2813         * locked?  Also, how do we know if the requests on the list are
2814         * being freed at this time?
2815         */
2816        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2817                struct ptlrpc_request *req =
2818                        list_entry(tmp, struct ptlrpc_request, rq_list);
2819
2820                DEBUG_REQ(D_RPCTRACE, req, "inflight");
2821
2822                spin_lock(&req->rq_lock);
2823                if (req->rq_import_generation < imp->imp_generation) {
2824                        req->rq_err = 1;
2825                        req->rq_status = -EIO;
2826                        ptlrpc_client_wake_req(req);
2827                }
2828                spin_unlock(&req->rq_lock);
2829        }
2830
2831        list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2832                struct ptlrpc_request *req =
2833                        list_entry(tmp, struct ptlrpc_request, rq_list);
2834
2835                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2836
2837                spin_lock(&req->rq_lock);
2838                if (req->rq_import_generation < imp->imp_generation) {
2839                        req->rq_err = 1;
2840                        req->rq_status = -EIO;
2841                        ptlrpc_client_wake_req(req);
2842                }
2843                spin_unlock(&req->rq_lock);
2844        }
2845
2846        /*
2847         * Last chance to free reqs left on the replay list, but we
2848         * will still leak reqs that haven't committed.
2849         */
2850        if (imp->imp_replayable)
2851                ptlrpc_free_committed(imp);
2852
2853        spin_unlock(&imp->imp_lock);
2854}
2855
2856/**
2857 * Abort all uncompleted requests in request set \a set
2858 */
2859void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2860{
2861        struct list_head *tmp, *pos;
2862
2863        list_for_each_safe(pos, tmp, &set->set_requests) {
2864                struct ptlrpc_request *req =
2865                        list_entry(pos, struct ptlrpc_request, rq_set_chain);
2866
2867                spin_lock(&req->rq_lock);
2868                if (req->rq_phase != RQ_PHASE_RPC) {
2869                        spin_unlock(&req->rq_lock);
2870                        continue;
2871                }
2872
2873                req->rq_err = 1;
2874                req->rq_status = -EINTR;
2875                ptlrpc_client_wake_req(req);
2876                spin_unlock(&req->rq_lock);
2877        }
2878}
2879
2880static __u64 ptlrpc_last_xid;
2881static spinlock_t ptlrpc_last_xid_lock;
2882
2883/**
2884 * Initialize the XID for the node.  This is common among all requests on
2885 * this node, and only requires the property that it is monotonically
2886 * increasing.  It does not need to be sequential.  Since this is also used
2887 * as the RDMA match bits, it is important that a single client NOT have
2888 * the same match bits for two different in-flight requests, hence we do
2889 * NOT want to have an XID per target or similar.
2890 *
2891 * To avoid an unlikely collision between match bits after a client reboot
2892 * (which would deliver old data into the wrong RDMA buffer) initialize
2893 * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
2894 * If the time is clearly incorrect, we instead use a 62-bit random number.
2895 * In the worst case the random number will overflow 1M RPCs per second in
2896 * 9133 years, or permutations thereof.
2897 */
2898#define YEAR_2004 (1ULL << 30)
2899void ptlrpc_init_xid(void)
2900{
2901        time64_t now = ktime_get_real_seconds();
2902
2903        spin_lock_init(&ptlrpc_last_xid_lock);
2904        if (now < YEAR_2004) {
2905                cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2906                ptlrpc_last_xid >>= 2;
2907                ptlrpc_last_xid |= (1ULL << 61);
2908        } else {
2909                ptlrpc_last_xid = (__u64)now << 20;
2910        }
2911
2912        /* Always need to be aligned to a power-of-two for multi-bulk BRW */
2913        CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0);
2914        ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
2915}
2916
2917/**
2918 * Increase xid and returns resulting new value to the caller.
2919 *
2920 * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
2921 * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
2922 * itself uses the last bulk xid needed, so the server can determine the
2923 * the number of bulk transfers from the RPC XID and a bitmask.  The starting
2924 * xid must align to a power-of-two value.
2925 *
2926 * This is assumed to be true due to the initial ptlrpc_last_xid
2927 * value also being initialized to a power-of-two value. LU-1431
2928 */
2929__u64 ptlrpc_next_xid(void)
2930{
2931        __u64 next;
2932
2933        spin_lock(&ptlrpc_last_xid_lock);
2934        next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2935        ptlrpc_last_xid = next;
2936        spin_unlock(&ptlrpc_last_xid_lock);
2937
2938        return next;
2939}
2940
2941/**
2942 * Get a glimpse at what next xid value might have been.
2943 * Returns possible next xid.
2944 */
2945__u64 ptlrpc_sample_next_xid(void)
2946{
2947#if BITS_PER_LONG == 32
2948        /* need to avoid possible word tearing on 32-bit systems */
2949        __u64 next;
2950
2951        spin_lock(&ptlrpc_last_xid_lock);
2952        next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2953        spin_unlock(&ptlrpc_last_xid_lock);
2954
2955        return next;
2956#else
2957        /* No need to lock, since returned value is racy anyways */
2958        return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2959#endif
2960}
2961EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2962
2963/**
2964 * Functions for operating ptlrpc workers.
2965 *
2966 * A ptlrpc work is a function which will be running inside ptlrpc context.
2967 * The callback shouldn't sleep otherwise it will block that ptlrpcd thread.
2968 *
2969 * 1. after a work is created, it can be used many times, that is:
2970 *       handler = ptlrpcd_alloc_work();
2971 *       ptlrpcd_queue_work();
2972 *
2973 *    queue it again when necessary:
2974 *       ptlrpcd_queue_work();
2975 *       ptlrpcd_destroy_work();
2976 * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but
2977 *    it will only be queued once in any time. Also as its name implies, it may
2978 *    have delay before it really runs by ptlrpcd thread.
2979 */
2980struct ptlrpc_work_async_args {
2981        int (*cb)(const struct lu_env *, void *);
2982        void *cbdata;
2983};
2984
2985static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
2986{
2987        /* re-initialize the req */
2988        req->rq_timeout         = obd_timeout;
2989        req->rq_sent            = ktime_get_real_seconds();
2990        req->rq_deadline        = req->rq_sent + req->rq_timeout;
2991        req->rq_phase           = RQ_PHASE_INTERPRET;
2992        req->rq_next_phase      = RQ_PHASE_COMPLETE;
2993        req->rq_xid             = ptlrpc_next_xid();
2994        req->rq_import_generation = req->rq_import->imp_generation;
2995
2996        ptlrpcd_add_req(req);
2997}
2998
2999static int work_interpreter(const struct lu_env *env,
3000                            struct ptlrpc_request *req, void *data, int rc)
3001{
3002        struct ptlrpc_work_async_args *arg = data;
3003
3004        LASSERT(ptlrpcd_check_work(req));
3005
3006        rc = arg->cb(env, arg->cbdata);
3007
3008        list_del_init(&req->rq_set_chain);
3009        req->rq_set = NULL;
3010
3011        if (atomic_dec_return(&req->rq_refcount) > 1) {
3012                atomic_set(&req->rq_refcount, 2);
3013                ptlrpcd_add_work_req(req);
3014        }
3015        return rc;
3016}
3017
3018static int worker_format;
3019
3020static int ptlrpcd_check_work(struct ptlrpc_request *req)
3021{
3022        return req->rq_pill.rc_fmt == (void *)&worker_format;
3023}
3024
3025/**
3026 * Create a work for ptlrpc.
3027 */
3028void *ptlrpcd_alloc_work(struct obd_import *imp,
3029                         int (*cb)(const struct lu_env *, void *), void *cbdata)
3030{
3031        struct ptlrpc_request    *req = NULL;
3032        struct ptlrpc_work_async_args *args;
3033
3034        might_sleep();
3035
3036        if (!cb)
3037                return ERR_PTR(-EINVAL);
3038
3039        /* copy some code from deprecated fakereq. */
3040        req = ptlrpc_request_cache_alloc(GFP_NOFS);
3041        if (!req) {
3042                CERROR("ptlrpc: run out of memory!\n");
3043                return ERR_PTR(-ENOMEM);
3044        }
3045
3046        ptlrpc_cli_req_init(req);
3047
3048        req->rq_send_state = LUSTRE_IMP_FULL;
3049        req->rq_type = PTL_RPC_MSG_REQUEST;
3050        req->rq_import = class_import_get(imp);
3051        req->rq_interpret_reply = work_interpreter;
3052        /* don't want reply */
3053        req->rq_no_delay = 1;
3054        req->rq_no_resend = 1;
3055        req->rq_pill.rc_fmt = (void *)&worker_format;
3056
3057        CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
3058        args = ptlrpc_req_async_args(req);
3059        args->cb = cb;
3060        args->cbdata = cbdata;
3061
3062        return req;
3063}
3064EXPORT_SYMBOL(ptlrpcd_alloc_work);
3065
3066void ptlrpcd_destroy_work(void *handler)
3067{
3068        struct ptlrpc_request *req = handler;
3069
3070        if (req)
3071                ptlrpc_req_finished(req);
3072}
3073EXPORT_SYMBOL(ptlrpcd_destroy_work);
3074
3075int ptlrpcd_queue_work(void *handler)
3076{
3077        struct ptlrpc_request *req = handler;
3078
3079        /*
3080         * Check if the req is already being queued.
3081         *
3082         * Here comes a trick: it lacks a way of checking if a req is being
3083         * processed reliably in ptlrpc. Here I have to use refcount of req
3084         * for this purpose. This is okay because the caller should use this
3085         * req as opaque data. - Jinshan
3086         */
3087        LASSERT(atomic_read(&req->rq_refcount) > 0);
3088        if (atomic_inc_return(&req->rq_refcount) == 2)
3089                ptlrpcd_add_work_req(req);
3090        return 0;
3091}
3092EXPORT_SYMBOL(ptlrpcd_queue_work);
3093