linux/drivers/staging/lustre/lustre/ptlrpc/client.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37/** Implementation of client-side PortalRPC interfaces */
  38
  39#define DEBUG_SUBSYSTEM S_RPC
  40
  41#include "../include/obd_support.h"
  42#include "../include/obd_class.h"
  43#include "../include/lustre_lib.h"
  44#include "../include/lustre_ha.h"
  45#include "../include/lustre_import.h"
  46#include "../include/lustre_req_layout.h"
  47
  48#include "ptlrpc_internal.h"
  49
  50static int ptlrpc_send_new_req(struct ptlrpc_request *req);
  51static int ptlrpcd_check_work(struct ptlrpc_request *req);
  52
  53/**
  54 * Initialize passed in client structure \a cl.
  55 */
  56void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
  57                        struct ptlrpc_client *cl)
  58{
  59        cl->cli_request_portal = req_portal;
  60        cl->cli_reply_portal   = rep_portal;
  61        cl->cli_name       = name;
  62}
  63EXPORT_SYMBOL(ptlrpc_init_client);
  64
  65/**
  66 * Return PortalRPC connection for remote uud \a uuid
  67 */
  68struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
  69{
  70        struct ptlrpc_connection *c;
  71        lnet_nid_t              self;
  72        lnet_process_id_t        peer;
  73        int                    err;
  74
  75        /* ptlrpc_uuid_to_peer() initializes its 2nd parameter
  76         * before accessing its values. */
  77        /* coverity[uninit_use_in_call] */
  78        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
  79        if (err != 0) {
  80                CNETERR("cannot find peer %s!\n", uuid->uuid);
  81                return NULL;
  82        }
  83
  84        c = ptlrpc_connection_get(peer, self, uuid);
  85        if (c) {
  86                memcpy(c->c_remote_uuid.uuid,
  87                       uuid->uuid, sizeof(c->c_remote_uuid.uuid));
  88        }
  89
  90        CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
  91
  92        return c;
  93}
  94EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
  95
  96/**
  97 * Allocate and initialize new bulk descriptor on the sender.
  98 * Returns pointer to the descriptor or NULL on error.
  99 */
 100struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
 101                                         unsigned type, unsigned portal)
 102{
 103        struct ptlrpc_bulk_desc *desc;
 104        int i;
 105
 106        OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
 107        if (!desc)
 108                return NULL;
 109
 110        spin_lock_init(&desc->bd_lock);
 111        init_waitqueue_head(&desc->bd_waitq);
 112        desc->bd_max_iov = npages;
 113        desc->bd_iov_count = 0;
 114        desc->bd_portal = portal;
 115        desc->bd_type = type;
 116        desc->bd_md_count = 0;
 117        LASSERT(max_brw > 0);
 118        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
 119        /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
 120         * node. Negotiated ocd_brw_size will always be <= this number. */
 121        for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
 122                LNetInvalidateHandle(&desc->bd_mds[i]);
 123
 124        return desc;
 125}
 126
 127/**
 128 * Prepare bulk descriptor for specified outgoing request \a req that
 129 * can fit \a npages * pages. \a type is bulk type. \a portal is where
 130 * the bulk to be sent. Used on client-side.
 131 * Returns pointer to newly allocated initialized bulk descriptor or NULL on
 132 * error.
 133 */
 134struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
 135                                              unsigned npages, unsigned max_brw,
 136                                              unsigned type, unsigned portal)
 137{
 138        struct obd_import *imp = req->rq_import;
 139        struct ptlrpc_bulk_desc *desc;
 140
 141        LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
 142        desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
 143        if (desc == NULL)
 144                return NULL;
 145
 146        desc->bd_import_generation = req->rq_import_generation;
 147        desc->bd_import = class_import_get(imp);
 148        desc->bd_req = req;
 149
 150        desc->bd_cbid.cbid_fn  = client_bulk_callback;
 151        desc->bd_cbid.cbid_arg = desc;
 152
 153        /* This makes req own desc, and free it when she frees herself */
 154        req->rq_bulk = desc;
 155
 156        return desc;
 157}
 158EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
 159
 160/**
 161 * Add a page \a page to the bulk descriptor \a desc.
 162 * Data to transfer in the page starts at offset \a pageoffset and
 163 * amount of data to transfer from the page is \a len
 164 */
 165void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 166                             struct page *page, int pageoffset, int len, int pin)
 167{
 168        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
 169        LASSERT(page != NULL);
 170        LASSERT(pageoffset >= 0);
 171        LASSERT(len > 0);
 172        LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
 173
 174        desc->bd_nob += len;
 175
 176        if (pin)
 177                page_cache_get(page);
 178
 179        ptlrpc_add_bulk_page(desc, page, pageoffset, len);
 180}
 181EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
 182
 183/**
 184 * Uninitialize and free bulk descriptor \a desc.
 185 * Works on bulk descriptors both from server and client side.
 186 */
 187void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
 188{
 189        int i;
 190
 191        LASSERT(desc != NULL);
 192        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
 193        LASSERT(desc->bd_md_count == 0);         /* network hands off */
 194        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
 195
 196        sptlrpc_enc_pool_put_pages(desc);
 197
 198        if (desc->bd_export)
 199                class_export_put(desc->bd_export);
 200        else
 201                class_import_put(desc->bd_import);
 202
 203        if (unpin) {
 204                for (i = 0; i < desc->bd_iov_count; i++)
 205                        page_cache_release(desc->bd_iov[i].kiov_page);
 206        }
 207
 208        OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
 209                                bd_iov[desc->bd_max_iov]));
 210}
 211EXPORT_SYMBOL(__ptlrpc_free_bulk);
 212
 213/**
 214 * Set server timelimit for this req, i.e. how long are we willing to wait
 215 * for reply before timing out this request.
 216 */
 217void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
 218{
 219        __u32 serv_est;
 220        int idx;
 221        struct imp_at *at;
 222
 223        LASSERT(req->rq_import);
 224
 225        if (AT_OFF) {
 226                /* non-AT settings */
 227                /**
 228                 * \a imp_server_timeout means this is reverse import and
 229                 * we send (currently only) ASTs to the client and cannot afford
 230                 * to wait too long for the reply, otherwise the other client
 231                 * (because of which we are sending this request) would
 232                 * timeout waiting for us
 233                 */
 234                req->rq_timeout = req->rq_import->imp_server_timeout ?
 235                                  obd_timeout / 2 : obd_timeout;
 236        } else {
 237                at = &req->rq_import->imp_at;
 238                idx = import_at_get_index(req->rq_import,
 239                                          req->rq_request_portal);
 240                serv_est = at_get(&at->iat_service_estimate[idx]);
 241                req->rq_timeout = at_est2timeout(serv_est);
 242        }
 243        /* We could get even fancier here, using history to predict increased
 244           loading... */
 245
 246        /* Let the server know what this RPC timeout is by putting it in the
 247           reqmsg*/
 248        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
 249}
 250EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 251
 252/* Adjust max service estimate based on server value */
 253static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
 254                                  unsigned int serv_est)
 255{
 256        int idx;
 257        unsigned int oldse;
 258        struct imp_at *at;
 259
 260        LASSERT(req->rq_import);
 261        at = &req->rq_import->imp_at;
 262
 263        idx = import_at_get_index(req->rq_import, req->rq_request_portal);
 264        /* max service estimates are tracked on the server side,
 265           so just keep minimal history here */
 266        oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
 267        if (oldse != 0)
 268                CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
 269                       req->rq_import->imp_obd->obd_name, req->rq_request_portal,
 270                       oldse, at_get(&at->iat_service_estimate[idx]));
 271}
 272
 273/* Expected network latency per remote node (secs) */
 274int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
 275{
 276        return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
 277}
 278
 279/* Adjust expected network latency */
 280static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
 281                                      unsigned int service_time)
 282{
 283        unsigned int nl, oldnl;
 284        struct imp_at *at;
 285        time_t now = get_seconds();
 286
 287        LASSERT(req->rq_import);
 288        at = &req->rq_import->imp_at;
 289
 290        /* Network latency is total time less server processing time */
 291        nl = max_t(int, now - req->rq_sent - service_time, 0) + 1/*st rounding*/;
 292        if (service_time > now - req->rq_sent + 3 /* bz16408 */)
 293                CWARN("Reported service time %u > total measured time "
 294                      CFS_DURATION_T"\n", service_time,
 295                      cfs_time_sub(now, req->rq_sent));
 296
 297        oldnl = at_measured(&at->iat_net_latency, nl);
 298        if (oldnl != 0)
 299                CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) has changed from %d to %d\n",
 300                       req->rq_import->imp_obd->obd_name,
 301                       obd_uuid2str(
 302                               &req->rq_import->imp_connection->c_remote_uuid),
 303                       oldnl, at_get(&at->iat_net_latency));
 304}
 305
 306static int unpack_reply(struct ptlrpc_request *req)
 307{
 308        int rc;
 309
 310        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
 311                rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
 312                if (rc) {
 313                        DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
 314                        return -EPROTO;
 315                }
 316        }
 317
 318        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
 319        if (rc) {
 320                DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
 321                return -EPROTO;
 322        }
 323        return 0;
 324}
 325
 326/**
 327 * Handle an early reply message, called with the rq_lock held.
 328 * If anything goes wrong just ignore it - same as if it never happened
 329 */
 330static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
 331{
 332        struct ptlrpc_request *early_req;
 333        time_t           olddl;
 334        int                 rc;
 335
 336        req->rq_early = 0;
 337        spin_unlock(&req->rq_lock);
 338
 339        rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
 340        if (rc) {
 341                spin_lock(&req->rq_lock);
 342                return rc;
 343        }
 344
 345        rc = unpack_reply(early_req);
 346        if (rc == 0) {
 347                /* Expecting to increase the service time estimate here */
 348                ptlrpc_at_adj_service(req,
 349                        lustre_msg_get_timeout(early_req->rq_repmsg));
 350                ptlrpc_at_adj_net_latency(req,
 351                        lustre_msg_get_service_time(early_req->rq_repmsg));
 352        }
 353
 354        sptlrpc_cli_finish_early_reply(early_req);
 355
 356        if (rc != 0) {
 357                spin_lock(&req->rq_lock);
 358                return rc;
 359        }
 360
 361        /* Adjust the local timeout for this req */
 362        ptlrpc_at_set_req_timeout(req);
 363
 364        spin_lock(&req->rq_lock);
 365        olddl = req->rq_deadline;
 366        /* server assumes it now has rq_timeout from when it sent the
 367         * early reply, so client should give it at least that long. */
 368        req->rq_deadline = get_seconds() + req->rq_timeout +
 369                           ptlrpc_at_get_net_latency(req);
 370
 371        DEBUG_REQ(D_ADAPTTO, req,
 372                  "Early reply #%d, new deadline in " CFS_DURATION_T "s (" CFS_DURATION_T "s)",
 373                  req->rq_early_count,
 374                  cfs_time_sub(req->rq_deadline, get_seconds()),
 375                  cfs_time_sub(req->rq_deadline, olddl));
 376
 377        return rc;
 378}
 379
 380struct kmem_cache *request_cache;
 381
 382int ptlrpc_request_cache_init(void)
 383{
 384        request_cache = kmem_cache_create("ptlrpc_cache",
 385                                          sizeof(struct ptlrpc_request),
 386                                          0, SLAB_HWCACHE_ALIGN, NULL);
 387        return request_cache == NULL ? -ENOMEM : 0;
 388}
 389
 390void ptlrpc_request_cache_fini(void)
 391{
 392        kmem_cache_destroy(request_cache);
 393}
 394
 395struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
 396{
 397        struct ptlrpc_request *req;
 398
 399        OBD_SLAB_ALLOC_PTR_GFP(req, request_cache, flags);
 400        return req;
 401}
 402
 403void ptlrpc_request_cache_free(struct ptlrpc_request *req)
 404{
 405        OBD_SLAB_FREE_PTR(req, request_cache);
 406}
 407
 408/**
 409 * Wind down request pool \a pool.
 410 * Frees all requests from the pool too
 411 */
 412void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 413{
 414        struct list_head *l, *tmp;
 415        struct ptlrpc_request *req;
 416
 417        LASSERT(pool != NULL);
 418
 419        spin_lock(&pool->prp_lock);
 420        list_for_each_safe(l, tmp, &pool->prp_req_list) {
 421                req = list_entry(l, struct ptlrpc_request, rq_list);
 422                list_del(&req->rq_list);
 423                LASSERT(req->rq_reqbuf);
 424                LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
 425                OBD_FREE_LARGE(req->rq_reqbuf, pool->prp_rq_size);
 426                ptlrpc_request_cache_free(req);
 427        }
 428        spin_unlock(&pool->prp_lock);
 429        OBD_FREE(pool, sizeof(*pool));
 430}
 431EXPORT_SYMBOL(ptlrpc_free_rq_pool);
 432
 433/**
 434 * Allocates, initializes and adds \a num_rq requests to the pool \a pool
 435 */
 436void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
 437{
 438        int i;
 439        int size = 1;
 440
 441        while (size < pool->prp_rq_size)
 442                size <<= 1;
 443
 444        LASSERTF(list_empty(&pool->prp_req_list) ||
 445                 size == pool->prp_rq_size,
 446                 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
 447                 pool->prp_rq_size, size);
 448
 449        spin_lock(&pool->prp_lock);
 450        pool->prp_rq_size = size;
 451        for (i = 0; i < num_rq; i++) {
 452                struct ptlrpc_request *req;
 453                struct lustre_msg *msg;
 454
 455                spin_unlock(&pool->prp_lock);
 456                req = ptlrpc_request_cache_alloc(GFP_NOFS);
 457                if (!req)
 458                        return;
 459                OBD_ALLOC_LARGE(msg, size);
 460                if (!msg) {
 461                        ptlrpc_request_cache_free(req);
 462                        return;
 463                }
 464                req->rq_reqbuf = msg;
 465                req->rq_reqbuf_len = size;
 466                req->rq_pool = pool;
 467                spin_lock(&pool->prp_lock);
 468                list_add_tail(&req->rq_list, &pool->prp_req_list);
 469        }
 470        spin_unlock(&pool->prp_lock);
 471        return;
 472}
 473EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
 474
 475/**
 476 * Create and initialize new request pool with given attributes:
 477 * \a num_rq - initial number of requests to create for the pool
 478 * \a msgsize - maximum message size possible for requests in thid pool
 479 * \a populate_pool - function to be called when more requests need to be added
 480 *                  to the pool
 481 * Returns pointer to newly created pool or NULL on error.
 482 */
 483struct ptlrpc_request_pool *
 484ptlrpc_init_rq_pool(int num_rq, int msgsize,
 485                    void (*populate_pool)(struct ptlrpc_request_pool *, int))
 486{
 487        struct ptlrpc_request_pool *pool;
 488
 489        OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
 490        if (!pool)
 491                return NULL;
 492
 493        /* Request next power of two for the allocation, because internally
 494           kernel would do exactly this */
 495
 496        spin_lock_init(&pool->prp_lock);
 497        INIT_LIST_HEAD(&pool->prp_req_list);
 498        pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
 499        pool->prp_populate = populate_pool;
 500
 501        populate_pool(pool, num_rq);
 502
 503        if (list_empty(&pool->prp_req_list)) {
 504                /* have not allocated a single request for the pool */
 505                OBD_FREE(pool, sizeof(struct ptlrpc_request_pool));
 506                pool = NULL;
 507        }
 508        return pool;
 509}
 510EXPORT_SYMBOL(ptlrpc_init_rq_pool);
 511
 512/**
 513 * Fetches one request from pool \a pool
 514 */
 515static struct ptlrpc_request *
 516ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
 517{
 518        struct ptlrpc_request *request;
 519        struct lustre_msg *reqbuf;
 520
 521        if (!pool)
 522                return NULL;
 523
 524        spin_lock(&pool->prp_lock);
 525
 526        /* See if we have anything in a pool, and bail out if nothing,
 527         * in writeout path, where this matters, this is safe to do, because
 528         * nothing is lost in this case, and when some in-flight requests
 529         * complete, this code will be called again. */
 530        if (unlikely(list_empty(&pool->prp_req_list))) {
 531                spin_unlock(&pool->prp_lock);
 532                return NULL;
 533        }
 534
 535        request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
 536                                 rq_list);
 537        list_del_init(&request->rq_list);
 538        spin_unlock(&pool->prp_lock);
 539
 540        LASSERT(request->rq_reqbuf);
 541        LASSERT(request->rq_pool);
 542
 543        reqbuf = request->rq_reqbuf;
 544        memset(request, 0, sizeof(*request));
 545        request->rq_reqbuf = reqbuf;
 546        request->rq_reqbuf_len = pool->prp_rq_size;
 547        request->rq_pool = pool;
 548
 549        return request;
 550}
 551
 552/**
 553 * Returns freed \a request to pool.
 554 */
 555static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
 556{
 557        struct ptlrpc_request_pool *pool = request->rq_pool;
 558
 559        spin_lock(&pool->prp_lock);
 560        LASSERT(list_empty(&request->rq_list));
 561        LASSERT(!request->rq_receiving_reply);
 562        list_add_tail(&request->rq_list, &pool->prp_req_list);
 563        spin_unlock(&pool->prp_lock);
 564}
 565
 566static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
 567                                      __u32 version, int opcode,
 568                                      int count, __u32 *lengths, char **bufs,
 569                                      struct ptlrpc_cli_ctx *ctx)
 570{
 571        struct obd_import  *imp = request->rq_import;
 572        int              rc;
 573
 574        if (unlikely(ctx))
 575                request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
 576        else {
 577                rc = sptlrpc_req_get_ctx(request);
 578                if (rc)
 579                        goto out_free;
 580        }
 581
 582        sptlrpc_req_set_flavor(request, opcode);
 583
 584        rc = lustre_pack_request(request, imp->imp_msg_magic, count,
 585                                 lengths, bufs);
 586        if (rc) {
 587                LASSERT(!request->rq_pool);
 588                goto out_ctx;
 589        }
 590
 591        lustre_msg_add_version(request->rq_reqmsg, version);
 592        request->rq_send_state = LUSTRE_IMP_FULL;
 593        request->rq_type = PTL_RPC_MSG_REQUEST;
 594        request->rq_export = NULL;
 595
 596        request->rq_req_cbid.cbid_fn  = request_out_callback;
 597        request->rq_req_cbid.cbid_arg = request;
 598
 599        request->rq_reply_cbid.cbid_fn  = reply_in_callback;
 600        request->rq_reply_cbid.cbid_arg = request;
 601
 602        request->rq_reply_deadline = 0;
 603        request->rq_phase = RQ_PHASE_NEW;
 604        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 605
 606        request->rq_request_portal = imp->imp_client->cli_request_portal;
 607        request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 608
 609        ptlrpc_at_set_req_timeout(request);
 610
 611        spin_lock_init(&request->rq_lock);
 612        INIT_LIST_HEAD(&request->rq_list);
 613        INIT_LIST_HEAD(&request->rq_timed_list);
 614        INIT_LIST_HEAD(&request->rq_replay_list);
 615        INIT_LIST_HEAD(&request->rq_ctx_chain);
 616        INIT_LIST_HEAD(&request->rq_set_chain);
 617        INIT_LIST_HEAD(&request->rq_history_list);
 618        INIT_LIST_HEAD(&request->rq_exp_list);
 619        init_waitqueue_head(&request->rq_reply_waitq);
 620        init_waitqueue_head(&request->rq_set_waitq);
 621        request->rq_xid = ptlrpc_next_xid();
 622        atomic_set(&request->rq_refcount, 1);
 623
 624        lustre_msg_set_opc(request->rq_reqmsg, opcode);
 625
 626        return 0;
 627out_ctx:
 628        sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
 629out_free:
 630        class_import_put(imp);
 631        return rc;
 632}
 633
 634int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
 635                             __u32 version, int opcode, char **bufs,
 636                             struct ptlrpc_cli_ctx *ctx)
 637{
 638        int count;
 639
 640        count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
 641        return __ptlrpc_request_bufs_pack(request, version, opcode, count,
 642                                          request->rq_pill.rc_area[RCL_CLIENT],
 643                                          bufs, ctx);
 644}
 645EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
 646
 647/**
 648 * Pack request buffers for network transfer, performing necessary encryption
 649 * steps if necessary.
 650 */
 651int ptlrpc_request_pack(struct ptlrpc_request *request,
 652                        __u32 version, int opcode)
 653{
 654        int rc;
 655        rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
 656        if (rc)
 657                return rc;
 658
 659        /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
 660         * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
 661         * have to send old ptlrpc_body to keep interoperability with these
 662         * clients.
 663         *
 664         * Only three kinds of server->client RPCs so far:
 665         *  - LDLM_BL_CALLBACK
 666         *  - LDLM_CP_CALLBACK
 667         *  - LDLM_GL_CALLBACK
 668         *
 669         * XXX This should be removed whenever we drop the interoperability with
 670         *     the these old clients.
 671         */
 672        if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
 673            opcode == LDLM_GL_CALLBACK)
 674                req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
 675                                   sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
 676
 677        return rc;
 678}
 679EXPORT_SYMBOL(ptlrpc_request_pack);
 680
 681/**
 682 * Helper function to allocate new request on import \a imp
 683 * and possibly using existing request from pool \a pool if provided.
 684 * Returns allocated request structure with import field filled or
 685 * NULL on error.
 686 */
 687static inline
 688struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
 689                                              struct ptlrpc_request_pool *pool)
 690{
 691        struct ptlrpc_request *request = NULL;
 692
 693        if (pool)
 694                request = ptlrpc_prep_req_from_pool(pool);
 695
 696        if (!request)
 697                request = ptlrpc_request_cache_alloc(GFP_NOFS);
 698
 699        if (request) {
 700                LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
 701                LASSERT(imp != LP_POISON);
 702                LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p",
 703                        imp->imp_client);
 704                LASSERT(imp->imp_client != LP_POISON);
 705
 706                request->rq_import = class_import_get(imp);
 707        } else {
 708                CERROR("request allocation out of memory\n");
 709        }
 710
 711        return request;
 712}
 713
 714/**
 715 * Helper function for creating a request.
 716 * Calls __ptlrpc_request_alloc to allocate new request structure and inits
 717 * buffer structures according to capsule template \a format.
 718 * Returns allocated request structure pointer or NULL on error.
 719 */
 720static struct ptlrpc_request *
 721ptlrpc_request_alloc_internal(struct obd_import *imp,
 722                              struct ptlrpc_request_pool *pool,
 723                              const struct req_format *format)
 724{
 725        struct ptlrpc_request *request;
 726
 727        request = __ptlrpc_request_alloc(imp, pool);
 728        if (request == NULL)
 729                return NULL;
 730
 731        req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
 732        req_capsule_set(&request->rq_pill, format);
 733        return request;
 734}
 735
 736/**
 737 * Allocate new request structure for import \a imp and initialize its
 738 * buffer structure according to capsule template \a format.
 739 */
 740struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
 741                                            const struct req_format *format)
 742{
 743        return ptlrpc_request_alloc_internal(imp, NULL, format);
 744}
 745EXPORT_SYMBOL(ptlrpc_request_alloc);
 746
 747/**
 748 * Allocate new request structure for import \a imp from pool \a pool and
 749 * initialize its buffer structure according to capsule template \a format.
 750 */
 751struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
 752                                            struct ptlrpc_request_pool *pool,
 753                                            const struct req_format *format)
 754{
 755        return ptlrpc_request_alloc_internal(imp, pool, format);
 756}
 757EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
 758
 759/**
 760 * For requests not from pool, free memory of the request structure.
 761 * For requests obtained from a pool earlier, return request back to pool.
 762 */
 763void ptlrpc_request_free(struct ptlrpc_request *request)
 764{
 765        if (request->rq_pool)
 766                __ptlrpc_free_req_to_pool(request);
 767        else
 768                ptlrpc_request_cache_free(request);
 769}
 770EXPORT_SYMBOL(ptlrpc_request_free);
 771
 772/**
 773 * Allocate new request for operation \a opcode and immediately pack it for
 774 * network transfer.
 775 * Only used for simple requests like OBD_PING where the only important
 776 * part of the request is operation itself.
 777 * Returns allocated request or NULL on error.
 778 */
 779struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
 780                                                const struct req_format *format,
 781                                                __u32 version, int opcode)
 782{
 783        struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
 784        int                 rc;
 785
 786        if (req) {
 787                rc = ptlrpc_request_pack(req, version, opcode);
 788                if (rc) {
 789                        ptlrpc_request_free(req);
 790                        req = NULL;
 791                }
 792        }
 793        return req;
 794}
 795EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
 796
 797/**
 798 * Prepare request (fetched from pool \a pool if not NULL) on import \a imp
 799 * for operation \a opcode. Request would contain \a count buffers.
 800 * Sizes of buffers are described in array \a lengths and buffers themselves
 801 * are provided by a pointer \a bufs.
 802 * Returns prepared request structure pointer or NULL on error.
 803 */
 804struct ptlrpc_request *
 805ptlrpc_prep_req_pool(struct obd_import *imp,
 806                     __u32 version, int opcode,
 807                     int count, __u32 *lengths, char **bufs,
 808                     struct ptlrpc_request_pool *pool)
 809{
 810        struct ptlrpc_request *request;
 811        int                 rc;
 812
 813        request = __ptlrpc_request_alloc(imp, pool);
 814        if (!request)
 815                return NULL;
 816
 817        rc = __ptlrpc_request_bufs_pack(request, version, opcode, count,
 818                                        lengths, bufs, NULL);
 819        if (rc) {
 820                ptlrpc_request_free(request);
 821                request = NULL;
 822        }
 823        return request;
 824}
 825EXPORT_SYMBOL(ptlrpc_prep_req_pool);
 826
 827/**
 828 * Same as ptlrpc_prep_req_pool, but without pool
 829 */
 830struct ptlrpc_request *
 831ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
 832                __u32 *lengths, char **bufs)
 833{
 834        return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
 835                                    NULL);
 836}
 837EXPORT_SYMBOL(ptlrpc_prep_req);
 838
 839/**
 840 * Allocate and initialize new request set structure.
 841 * Returns a pointer to the newly allocated set structure or NULL on error.
 842 */
 843struct ptlrpc_request_set *ptlrpc_prep_set(void)
 844{
 845        struct ptlrpc_request_set *set;
 846
 847        OBD_ALLOC(set, sizeof(*set));
 848        if (!set)
 849                return NULL;
 850        atomic_set(&set->set_refcount, 1);
 851        INIT_LIST_HEAD(&set->set_requests);
 852        init_waitqueue_head(&set->set_waitq);
 853        atomic_set(&set->set_new_count, 0);
 854        atomic_set(&set->set_remaining, 0);
 855        spin_lock_init(&set->set_new_req_lock);
 856        INIT_LIST_HEAD(&set->set_new_requests);
 857        INIT_LIST_HEAD(&set->set_cblist);
 858        set->set_max_inflight = UINT_MAX;
 859        set->set_producer     = NULL;
 860        set->set_producer_arg = NULL;
 861        set->set_rc        = 0;
 862
 863        return set;
 864}
 865EXPORT_SYMBOL(ptlrpc_prep_set);
 866
 867/**
 868 * Allocate and initialize new request set structure with flow control
 869 * extension. This extension allows to control the number of requests in-flight
 870 * for the whole set. A callback function to generate requests must be provided
 871 * and the request set will keep the number of requests sent over the wire to
 872 * @max_inflight.
 873 * Returns a pointer to the newly allocated set structure or NULL on error.
 874 */
 875struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
 876                                             void *arg)
 877
 878{
 879        struct ptlrpc_request_set *set;
 880
 881        set = ptlrpc_prep_set();
 882        if (!set)
 883                return NULL;
 884
 885        set->set_max_inflight  = max;
 886        set->set_producer      = func;
 887        set->set_producer_arg  = arg;
 888
 889        return set;
 890}
 891EXPORT_SYMBOL(ptlrpc_prep_fcset);
 892
 893/**
 894 * Wind down and free request set structure previously allocated with
 895 * ptlrpc_prep_set.
 896 * Ensures that all requests on the set have completed and removes
 897 * all requests from the request list in a set.
 898 * If any unsent request happen to be on the list, pretends that they got
 899 * an error in flight and calls their completion handler.
 900 */
 901void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 902{
 903        struct list_head       *tmp;
 904        struct list_head       *next;
 905        int            expected_phase;
 906        int            n = 0;
 907
 908        /* Requests on the set should either all be completed, or all be new */
 909        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
 910                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
 911        list_for_each(tmp, &set->set_requests) {
 912                struct ptlrpc_request *req =
 913                        list_entry(tmp, struct ptlrpc_request,
 914                                       rq_set_chain);
 915
 916                LASSERT(req->rq_phase == expected_phase);
 917                n++;
 918        }
 919
 920        LASSERTF(atomic_read(&set->set_remaining) == 0 ||
 921                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
 922                 atomic_read(&set->set_remaining), n);
 923
 924        list_for_each_safe(tmp, next, &set->set_requests) {
 925                struct ptlrpc_request *req =
 926                        list_entry(tmp, struct ptlrpc_request,
 927                                       rq_set_chain);
 928                list_del_init(&req->rq_set_chain);
 929
 930                LASSERT(req->rq_phase == expected_phase);
 931
 932                if (req->rq_phase == RQ_PHASE_NEW) {
 933                        ptlrpc_req_interpret(NULL, req, -EBADR);
 934                        atomic_dec(&set->set_remaining);
 935                }
 936
 937                spin_lock(&req->rq_lock);
 938                req->rq_set = NULL;
 939                req->rq_invalid_rqset = 0;
 940                spin_unlock(&req->rq_lock);
 941
 942                ptlrpc_req_finished(req);
 943        }
 944
 945        LASSERT(atomic_read(&set->set_remaining) == 0);
 946
 947        ptlrpc_reqset_put(set);
 948}
 949EXPORT_SYMBOL(ptlrpc_set_destroy);
 950
 951/**
 952 * Add a callback function \a fn to the set.
 953 * This function would be called when all requests on this set are completed.
 954 * The function will be passed \a data argument.
 955 */
 956int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 957                      set_interpreter_func fn, void *data)
 958{
 959        struct ptlrpc_set_cbdata *cbdata;
 960
 961        OBD_ALLOC_PTR(cbdata);
 962        if (cbdata == NULL)
 963                return -ENOMEM;
 964
 965        cbdata->psc_interpret = fn;
 966        cbdata->psc_data = data;
 967        list_add_tail(&cbdata->psc_item, &set->set_cblist);
 968
 969        return 0;
 970}
 971EXPORT_SYMBOL(ptlrpc_set_add_cb);
 972
 973/**
 974 * Add a new request to the general purpose request set.
 975 * Assumes request reference from the caller.
 976 */
 977void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
 978                        struct ptlrpc_request *req)
 979{
 980        LASSERT(list_empty(&req->rq_set_chain));
 981
 982        /* The set takes over the caller's request reference */
 983        list_add_tail(&req->rq_set_chain, &set->set_requests);
 984        req->rq_set = set;
 985        atomic_inc(&set->set_remaining);
 986        req->rq_queued_time = cfs_time_current();
 987
 988        if (req->rq_reqmsg != NULL)
 989                lustre_msg_set_jobid(req->rq_reqmsg, NULL);
 990
 991        if (set->set_producer != NULL)
 992                /* If the request set has a producer callback, the RPC must be
 993                 * sent straight away */
 994                ptlrpc_send_new_req(req);
 995}
 996EXPORT_SYMBOL(ptlrpc_set_add_req);
 997
 998/**
 999 * Add a request to a request with dedicated server thread
1000 * and wake the thread to make any necessary processing.
1001 * Currently only used for ptlrpcd.
1002 */
1003void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
1004                           struct ptlrpc_request *req)
1005{
1006        struct ptlrpc_request_set *set = pc->pc_set;
1007        int count, i;
1008
1009        LASSERT(req->rq_set == NULL);
1010        LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
1011
1012        spin_lock(&set->set_new_req_lock);
1013        /*
1014         * The set takes over the caller's request reference.
1015         */
1016        req->rq_set = set;
1017        req->rq_queued_time = cfs_time_current();
1018        list_add_tail(&req->rq_set_chain, &set->set_new_requests);
1019        count = atomic_inc_return(&set->set_new_count);
1020        spin_unlock(&set->set_new_req_lock);
1021
1022        /* Only need to call wakeup once for the first entry. */
1023        if (count == 1) {
1024                wake_up(&set->set_waitq);
1025
1026                /* XXX: It maybe unnecessary to wakeup all the partners. But to
1027                 *      guarantee the async RPC can be processed ASAP, we have
1028                 *      no other better choice. It maybe fixed in future. */
1029                for (i = 0; i < pc->pc_npartners; i++)
1030                        wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
1031        }
1032}
1033EXPORT_SYMBOL(ptlrpc_set_add_new_req);
1034
1035/**
1036 * Based on the current state of the import, determine if the request
1037 * can be sent, is an error, or should be delayed.
1038 *
1039 * Returns true if this request should be delayed. If false, and
1040 * *status is set, then the request can not be sent and *status is the
1041 * error code.  If false and status is 0, then request can be sent.
1042 *
1043 * The imp->imp_lock must be held.
1044 */
1045static int ptlrpc_import_delay_req(struct obd_import *imp,
1046                                   struct ptlrpc_request *req, int *status)
1047{
1048        int delay = 0;
1049
1050        LASSERT(status != NULL);
1051        *status = 0;
1052
1053        if (req->rq_ctx_init || req->rq_ctx_fini) {
1054                /* always allow ctx init/fini rpc go through */
1055        } else if (imp->imp_state == LUSTRE_IMP_NEW) {
1056                DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
1057                *status = -EIO;
1058        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1059                /* pings may safely race with umount */
1060                DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
1061                          D_HA : D_ERROR, req, "IMP_CLOSED ");
1062                *status = -EIO;
1063        } else if (ptlrpc_send_limit_expired(req)) {
1064                /* probably doesn't need to be a D_ERROR after initial testing */
1065                DEBUG_REQ(D_ERROR, req, "send limit expired ");
1066                *status = -EIO;
1067        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1068                   imp->imp_state == LUSTRE_IMP_CONNECTING) {
1069                /* allow CONNECT even if import is invalid */
1070                if (atomic_read(&imp->imp_inval_count) != 0) {
1071                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1072                        *status = -EIO;
1073                }
1074        } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1075                if (!imp->imp_deactive)
1076                        DEBUG_REQ(D_NET, req, "IMP_INVALID");
1077                *status = -ESHUTDOWN; /* bz 12940 */
1078        } else if (req->rq_import_generation != imp->imp_generation) {
1079                DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1080                *status = -EIO;
1081        } else if (req->rq_send_state != imp->imp_state) {
1082                /* invalidate in progress - any requests should be drop */
1083                if (atomic_read(&imp->imp_inval_count) != 0) {
1084                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1085                        *status = -EIO;
1086                } else if (imp->imp_dlm_fake || req->rq_no_delay) {
1087                        *status = -EWOULDBLOCK;
1088                } else if (req->rq_allow_replay &&
1089                          (imp->imp_state == LUSTRE_IMP_REPLAY ||
1090                           imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1091                           imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1092                           imp->imp_state == LUSTRE_IMP_RECOVER)) {
1093                        DEBUG_REQ(D_HA, req, "allow during recovery.\n");
1094                } else {
1095                        delay = 1;
1096                }
1097        }
1098
1099        return delay;
1100}
1101
1102/**
1103 * Decide if the error message regarding provided request \a req
1104 * should be printed to the console or not.
1105 * Makes it's decision on request status and other properties.
1106 * Returns 1 to print error on the system console or 0 if not.
1107 */
1108static int ptlrpc_console_allow(struct ptlrpc_request *req)
1109{
1110        __u32 opc;
1111        int err;
1112
1113        LASSERT(req->rq_reqmsg != NULL);
1114        opc = lustre_msg_get_opc(req->rq_reqmsg);
1115
1116        /* Suppress particular reconnect errors which are to be expected.  No
1117         * errors are suppressed for the initial connection on an import */
1118        if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) &&
1119            (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) {
1120
1121                /* Suppress timed out reconnect requests */
1122                if (req->rq_timedout)
1123                        return 0;
1124
1125                /* Suppress unavailable/again reconnect requests */
1126                err = lustre_msg_get_status(req->rq_repmsg);
1127                if (err == -ENODEV || err == -EAGAIN)
1128                        return 0;
1129        }
1130
1131        return 1;
1132}
1133
1134/**
1135 * Check request processing status.
1136 * Returns the status.
1137 */
1138static int ptlrpc_check_status(struct ptlrpc_request *req)
1139{
1140        int err;
1141
1142        err = lustre_msg_get_status(req->rq_repmsg);
1143        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1144                struct obd_import *imp = req->rq_import;
1145                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1146                if (ptlrpc_console_allow(req))
1147                        LCONSOLE_ERROR_MSG(0x011, "%s: Communicating with %s, operation %s failed with %d.\n",
1148                                           imp->imp_obd->obd_name,
1149                                           libcfs_nid2str(
1150                                                   imp->imp_connection->c_peer.nid),
1151                                           ll_opcode2str(opc), err);
1152                return err < 0 ? err : -EINVAL;
1153        }
1154
1155        if (err < 0) {
1156                DEBUG_REQ(D_INFO, req, "status is %d", err);
1157        } else if (err > 0) {
1158                /* XXX: translate this error from net to host */
1159                DEBUG_REQ(D_INFO, req, "status is %d", err);
1160        }
1161
1162        return err;
1163}
1164
1165/**
1166 * save pre-versions of objects into request for replay.
1167 * Versions are obtained from server reply.
1168 * used for VBR.
1169 */
1170static void ptlrpc_save_versions(struct ptlrpc_request *req)
1171{
1172        struct lustre_msg *repmsg = req->rq_repmsg;
1173        struct lustre_msg *reqmsg = req->rq_reqmsg;
1174        __u64 *versions = lustre_msg_get_versions(repmsg);
1175
1176        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1177                return;
1178
1179        LASSERT(versions);
1180        lustre_msg_set_versions(reqmsg, versions);
1181        CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1182               versions[0], versions[1]);
1183}
1184
1185/**
1186 * Callback function called when client receives RPC reply for \a req.
1187 * Returns 0 on success or error code.
1188 * The return value would be assigned to req->rq_status by the caller
1189 * as request processing status.
1190 * This function also decides if the request needs to be saved for later replay.
1191 */
1192static int after_reply(struct ptlrpc_request *req)
1193{
1194        struct obd_import *imp = req->rq_import;
1195        struct obd_device *obd = req->rq_import->imp_obd;
1196        int rc;
1197        struct timeval work_start;
1198        long timediff;
1199
1200        LASSERT(obd != NULL);
1201        /* repbuf must be unlinked */
1202        LASSERT(!req->rq_receiving_reply && !req->rq_reply_unlink);
1203
1204        if (req->rq_reply_truncate) {
1205                if (ptlrpc_no_resend(req)) {
1206                        DEBUG_REQ(D_ERROR, req, "reply buffer overflow, expected: %d, actual size: %d",
1207                                  req->rq_nob_received, req->rq_repbuf_len);
1208                        return -EOVERFLOW;
1209                }
1210
1211                sptlrpc_cli_free_repbuf(req);
1212                /* Pass the required reply buffer size (include
1213                 * space for early reply).
1214                 * NB: no need to roundup because alloc_repbuf
1215                 * will roundup it */
1216                req->rq_replen       = req->rq_nob_received;
1217                req->rq_nob_received = 0;
1218                spin_lock(&req->rq_lock);
1219                req->rq_resend       = 1;
1220                spin_unlock(&req->rq_lock);
1221                return 0;
1222        }
1223
1224        /*
1225         * NB Until this point, the whole of the incoming message,
1226         * including buflens, status etc is in the sender's byte order.
1227         */
1228        rc = sptlrpc_cli_unwrap_reply(req);
1229        if (rc) {
1230                DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
1231                return rc;
1232        }
1233
1234        /*
1235         * Security layer unwrap might ask resend this request.
1236         */
1237        if (req->rq_resend)
1238                return 0;
1239
1240        rc = unpack_reply(req);
1241        if (rc)
1242                return rc;
1243
1244        /* retry indefinitely on EINPROGRESS */
1245        if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1246            ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1247                time_t  now = get_seconds();
1248
1249                DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
1250                spin_lock(&req->rq_lock);
1251                req->rq_resend = 1;
1252                spin_unlock(&req->rq_lock);
1253                req->rq_nr_resend++;
1254
1255                /* allocate new xid to avoid reply reconstruction */
1256                if (!req->rq_bulk) {
1257                        /* new xid is already allocated for bulk in
1258                         * ptlrpc_check_set() */
1259                        req->rq_xid = ptlrpc_next_xid();
1260                        DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
1261                }
1262
1263                /* Readjust the timeout for current conditions */
1264                ptlrpc_at_set_req_timeout(req);
1265                /* delay resend to give a chance to the server to get ready.
1266                 * The delay is increased by 1s on every resend and is capped to
1267                 * the current request timeout (i.e. obd_timeout if AT is off,
1268                 * or AT service time x 125% + 5s, see at_est2timeout) */
1269                if (req->rq_nr_resend > req->rq_timeout)
1270                        req->rq_sent = now + req->rq_timeout;
1271                else
1272                        req->rq_sent = now + req->rq_nr_resend;
1273
1274                return 0;
1275        }
1276
1277        do_gettimeofday(&work_start);
1278        timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
1279        if (obd->obd_svc_stats != NULL) {
1280                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1281                                    timediff);
1282                ptlrpc_lprocfs_rpc_sent(req, timediff);
1283        }
1284
1285        if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1286            lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1287                DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1288                          lustre_msg_get_type(req->rq_repmsg));
1289                return -EPROTO;
1290        }
1291
1292        if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1293                CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1294        ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1295        ptlrpc_at_adj_net_latency(req,
1296                                  lustre_msg_get_service_time(req->rq_repmsg));
1297
1298        rc = ptlrpc_check_status(req);
1299        imp->imp_connect_error = rc;
1300
1301        if (rc) {
1302                /*
1303                 * Either we've been evicted, or the server has failed for
1304                 * some reason. Try to reconnect, and if that fails, punt to
1305                 * the upcall.
1306                 */
1307                if (ll_rpc_recoverable_error(rc)) {
1308                        if (req->rq_send_state != LUSTRE_IMP_FULL ||
1309                            imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1310                                return rc;
1311                        }
1312                        ptlrpc_request_handle_notconn(req);
1313                        return rc;
1314                }
1315        } else {
1316                /*
1317                 * Let's look if server sent slv. Do it only for RPC with
1318                 * rc == 0.
1319                 */
1320                ldlm_cli_update_pool(req);
1321        }
1322
1323        /*
1324         * Store transno in reqmsg for replay.
1325         */
1326        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1327                req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1328                lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1329        }
1330
1331        if (imp->imp_replayable) {
1332                spin_lock(&imp->imp_lock);
1333                /*
1334                 * No point in adding already-committed requests to the replay
1335                 * list, we will just remove them immediately. b=9829
1336                 */
1337                if (req->rq_transno != 0 &&
1338                    (req->rq_transno >
1339                     lustre_msg_get_last_committed(req->rq_repmsg) ||
1340                     req->rq_replay)) {
1341                        /** version recovery */
1342                        ptlrpc_save_versions(req);
1343                        ptlrpc_retain_replayable_request(req, imp);
1344                } else if (req->rq_commit_cb != NULL &&
1345                           list_empty(&req->rq_replay_list)) {
1346                        /* NB: don't call rq_commit_cb if it's already on
1347                         * rq_replay_list, ptlrpc_free_committed() will call
1348                         * it later, see LU-3618 for details */
1349                        spin_unlock(&imp->imp_lock);
1350                        req->rq_commit_cb(req);
1351                        spin_lock(&imp->imp_lock);
1352                }
1353
1354                /*
1355                 * Replay-enabled imports return commit-status information.
1356                 */
1357                if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1358                        imp->imp_peer_committed_transno =
1359                                lustre_msg_get_last_committed(req->rq_repmsg);
1360                }
1361
1362                ptlrpc_free_committed(imp);
1363
1364                if (!list_empty(&imp->imp_replay_list)) {
1365                        struct ptlrpc_request *last;
1366
1367                        last = list_entry(imp->imp_replay_list.prev,
1368                                              struct ptlrpc_request,
1369                                              rq_replay_list);
1370                        /*
1371                         * Requests with rq_replay stay on the list even if no
1372                         * commit is expected.
1373                         */
1374                        if (last->rq_transno > imp->imp_peer_committed_transno)
1375                                ptlrpc_pinger_commit_expected(imp);
1376                }
1377
1378                spin_unlock(&imp->imp_lock);
1379        }
1380
1381        return rc;
1382}
1383
1384/**
1385 * Helper function to send request \a req over the network for the first time
1386 * Also adjusts request phase.
1387 * Returns 0 on success or error code.
1388 */
1389static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1390{
1391        struct obd_import     *imp = req->rq_import;
1392        int rc;
1393
1394        LASSERT(req->rq_phase == RQ_PHASE_NEW);
1395        if (req->rq_sent && (req->rq_sent > get_seconds()) &&
1396            (!req->rq_generation_set ||
1397             req->rq_import_generation == imp->imp_generation))
1398                return 0;
1399
1400        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1401
1402        spin_lock(&imp->imp_lock);
1403
1404        if (!req->rq_generation_set)
1405                req->rq_import_generation = imp->imp_generation;
1406
1407        if (ptlrpc_import_delay_req(imp, req, &rc)) {
1408                spin_lock(&req->rq_lock);
1409                req->rq_waiting = 1;
1410                spin_unlock(&req->rq_lock);
1411
1412                DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: (%s != %s)",
1413                          lustre_msg_get_status(req->rq_reqmsg),
1414                          ptlrpc_import_state_name(req->rq_send_state),
1415                          ptlrpc_import_state_name(imp->imp_state));
1416                LASSERT(list_empty(&req->rq_list));
1417                list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1418                atomic_inc(&req->rq_import->imp_inflight);
1419                spin_unlock(&imp->imp_lock);
1420                return 0;
1421        }
1422
1423        if (rc != 0) {
1424                spin_unlock(&imp->imp_lock);
1425                req->rq_status = rc;
1426                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1427                return rc;
1428        }
1429
1430        LASSERT(list_empty(&req->rq_list));
1431        list_add_tail(&req->rq_list, &imp->imp_sending_list);
1432        atomic_inc(&req->rq_import->imp_inflight);
1433        spin_unlock(&imp->imp_lock);
1434
1435        lustre_msg_set_status(req->rq_reqmsg, current_pid());
1436
1437        rc = sptlrpc_req_refresh_ctx(req, -1);
1438        if (rc) {
1439                if (req->rq_err) {
1440                        req->rq_status = rc;
1441                        return 1;
1442                } else {
1443                        spin_lock(&req->rq_lock);
1444                        req->rq_wait_ctx = 1;
1445                        spin_unlock(&req->rq_lock);
1446                        return 0;
1447                }
1448        }
1449
1450        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1451               current_comm(),
1452               imp->imp_obd->obd_uuid.uuid,
1453               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1454               libcfs_nid2str(imp->imp_connection->c_peer.nid),
1455               lustre_msg_get_opc(req->rq_reqmsg));
1456
1457        rc = ptl_send_rpc(req, 0);
1458        if (rc) {
1459                DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1460                spin_lock(&req->rq_lock);
1461                req->rq_net_err = 1;
1462                spin_unlock(&req->rq_lock);
1463                return rc;
1464        }
1465        return 0;
1466}
1467
1468static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1469{
1470        int remaining, rc;
1471
1472        LASSERT(set->set_producer != NULL);
1473
1474        remaining = atomic_read(&set->set_remaining);
1475
1476        /* populate the ->set_requests list with requests until we
1477         * reach the maximum number of RPCs in flight for this set */
1478        while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1479                rc = set->set_producer(set, set->set_producer_arg);
1480                if (rc == -ENOENT) {
1481                        /* no more RPC to produce */
1482                        set->set_producer     = NULL;
1483                        set->set_producer_arg = NULL;
1484                        return 0;
1485                }
1486        }
1487
1488        return (atomic_read(&set->set_remaining) - remaining);
1489}
1490
1491/**
1492 * this sends any unsent RPCs in \a set and returns 1 if all are sent
1493 * and no more replies are expected.
1494 * (it is possible to get less replies than requests sent e.g. due to timed out
1495 * requests or requests that we had trouble to send out)
1496 *
1497 * NOTE: This function contains a potential schedule point (cond_resched()).
1498 */
1499int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1500{
1501        struct list_head *tmp, *next;
1502        struct list_head comp_reqs;
1503        int force_timer_recalc = 0;
1504
1505        if (atomic_read(&set->set_remaining) == 0)
1506                return 1;
1507
1508        INIT_LIST_HEAD(&comp_reqs);
1509        list_for_each_safe(tmp, next, &set->set_requests) {
1510                struct ptlrpc_request *req =
1511                        list_entry(tmp, struct ptlrpc_request,
1512                                       rq_set_chain);
1513                struct obd_import *imp = req->rq_import;
1514                int unregistered = 0;
1515                int rc = 0;
1516
1517                /* This schedule point is mainly for the ptlrpcd caller of this
1518                 * function.  Most ptlrpc sets are not long-lived and unbounded
1519                 * in length, but at the least the set used by the ptlrpcd is.
1520                 * Since the processing time is unbounded, we need to insert an
1521                 * explicit schedule point to make the thread well-behaved.
1522                 */
1523                cond_resched();
1524
1525                if (req->rq_phase == RQ_PHASE_NEW &&
1526                    ptlrpc_send_new_req(req)) {
1527                        force_timer_recalc = 1;
1528                }
1529
1530                /* delayed send - skip */
1531                if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1532                        continue;
1533
1534                /* delayed resend - skip */
1535                if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1536                    req->rq_sent > get_seconds())
1537                        continue;
1538
1539                if (!(req->rq_phase == RQ_PHASE_RPC ||
1540                      req->rq_phase == RQ_PHASE_BULK ||
1541                      req->rq_phase == RQ_PHASE_INTERPRET ||
1542                      req->rq_phase == RQ_PHASE_UNREGISTERING ||
1543                      req->rq_phase == RQ_PHASE_COMPLETE)) {
1544                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1545                        LBUG();
1546                }
1547
1548                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
1549                        LASSERT(req->rq_next_phase != req->rq_phase);
1550                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1551
1552                        /*
1553                         * Skip processing until reply is unlinked. We
1554                         * can't return to pool before that and we can't
1555                         * call interpret before that. We need to make
1556                         * sure that all rdma transfers finished and will
1557                         * not corrupt any data.
1558                         */
1559                        if (ptlrpc_client_recv_or_unlink(req) ||
1560                            ptlrpc_client_bulk_active(req))
1561                                continue;
1562
1563                        /*
1564                         * Turn fail_loc off to prevent it from looping
1565                         * forever.
1566                         */
1567                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1568                                OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1569                                                     OBD_FAIL_ONCE);
1570                        }
1571                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1572                                OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1573                                                     OBD_FAIL_ONCE);
1574                        }
1575
1576                        /*
1577                         * Move to next phase if reply was successfully
1578                         * unlinked.
1579                         */
1580                        ptlrpc_rqphase_move(req, req->rq_next_phase);
1581                }
1582
1583                if (req->rq_phase == RQ_PHASE_COMPLETE) {
1584                        list_move_tail(&req->rq_set_chain, &comp_reqs);
1585                        continue;
1586                }
1587
1588                if (req->rq_phase == RQ_PHASE_INTERPRET)
1589                        goto interpret;
1590
1591                /*
1592                 * Note that this also will start async reply unlink.
1593                 */
1594                if (req->rq_net_err && !req->rq_timedout) {
1595                        ptlrpc_expire_one_request(req, 1);
1596
1597                        /*
1598                         * Check if we still need to wait for unlink.
1599                         */
1600                        if (ptlrpc_client_recv_or_unlink(req) ||
1601                            ptlrpc_client_bulk_active(req))
1602                                continue;
1603                        /* If there is no need to resend, fail it now. */
1604                        if (req->rq_no_resend) {
1605                                if (req->rq_status == 0)
1606                                        req->rq_status = -EIO;
1607                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1608                                goto interpret;
1609                        } else {
1610                                continue;
1611                        }
1612                }
1613
1614                if (req->rq_err) {
1615                        spin_lock(&req->rq_lock);
1616                        req->rq_replied = 0;
1617                        spin_unlock(&req->rq_lock);
1618                        if (req->rq_status == 0)
1619                                req->rq_status = -EIO;
1620                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1621                        goto interpret;
1622                }
1623
1624                /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
1625                 * so it sets rq_intr regardless of individual rpc
1626                 * timeouts. The synchronous IO waiting path sets
1627                 * rq_intr irrespective of whether ptlrpcd
1628                 * has seen a timeout.  Our policy is to only interpret
1629                 * interrupted rpcs after they have timed out, so we
1630                 * need to enforce that here.
1631                 */
1632
1633                if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1634                                     req->rq_wait_ctx)) {
1635                        req->rq_status = -EINTR;
1636                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1637                        goto interpret;
1638                }
1639
1640                if (req->rq_phase == RQ_PHASE_RPC) {
1641                        if (req->rq_timedout || req->rq_resend ||
1642                            req->rq_waiting || req->rq_wait_ctx) {
1643                                int status;
1644
1645                                if (!ptlrpc_unregister_reply(req, 1))
1646                                        continue;
1647
1648                                spin_lock(&imp->imp_lock);
1649                                if (ptlrpc_import_delay_req(imp, req,
1650                                                            &status)) {
1651                                        /* put on delay list - only if we wait
1652                                         * recovery finished - before send */
1653                                        list_del_init(&req->rq_list);
1654                                        list_add_tail(&req->rq_list,
1655                                                          &imp->
1656                                                          imp_delayed_list);
1657                                        spin_unlock(&imp->imp_lock);
1658                                        continue;
1659                                }
1660
1661                                if (status != 0)  {
1662                                        req->rq_status = status;
1663                                        ptlrpc_rqphase_move(req,
1664                                                RQ_PHASE_INTERPRET);
1665                                        spin_unlock(&imp->imp_lock);
1666                                        goto interpret;
1667                                }
1668                                if (ptlrpc_no_resend(req) &&
1669                                    !req->rq_wait_ctx) {
1670                                        req->rq_status = -ENOTCONN;
1671                                        ptlrpc_rqphase_move(req,
1672                                                            RQ_PHASE_INTERPRET);
1673                                        spin_unlock(&imp->imp_lock);
1674                                        goto interpret;
1675                                }
1676
1677                                list_del_init(&req->rq_list);
1678                                list_add_tail(&req->rq_list,
1679                                                  &imp->imp_sending_list);
1680
1681                                spin_unlock(&imp->imp_lock);
1682
1683                                spin_lock(&req->rq_lock);
1684                                req->rq_waiting = 0;
1685                                spin_unlock(&req->rq_lock);
1686
1687                                if (req->rq_timedout || req->rq_resend) {
1688                                        /* This is re-sending anyways,
1689                                         * let's mark req as resend. */
1690                                        spin_lock(&req->rq_lock);
1691                                        req->rq_resend = 1;
1692                                        spin_unlock(&req->rq_lock);
1693                                        if (req->rq_bulk) {
1694                                                __u64 old_xid;
1695
1696                                                if (!ptlrpc_unregister_bulk(req, 1))
1697                                                        continue;
1698
1699                                                /* ensure previous bulk fails */
1700                                                old_xid = req->rq_xid;
1701                                                req->rq_xid = ptlrpc_next_xid();
1702                                                CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
1703                                                       old_xid, req->rq_xid);
1704                                        }
1705                                }
1706                                /*
1707                                 * rq_wait_ctx is only touched by ptlrpcd,
1708                                 * so no lock is needed here.
1709                                 */
1710                                status = sptlrpc_req_refresh_ctx(req, -1);
1711                                if (status) {
1712                                        if (req->rq_err) {
1713                                                req->rq_status = status;
1714                                                spin_lock(&req->rq_lock);
1715                                                req->rq_wait_ctx = 0;
1716                                                spin_unlock(&req->rq_lock);
1717                                                force_timer_recalc = 1;
1718                                        } else {
1719                                                spin_lock(&req->rq_lock);
1720                                                req->rq_wait_ctx = 1;
1721                                                spin_unlock(&req->rq_lock);
1722                                        }
1723
1724                                        continue;
1725                                } else {
1726                                        spin_lock(&req->rq_lock);
1727                                        req->rq_wait_ctx = 0;
1728                                        spin_unlock(&req->rq_lock);
1729                                }
1730
1731                                rc = ptl_send_rpc(req, 0);
1732                                if (rc) {
1733                                        DEBUG_REQ(D_HA, req,
1734                                                  "send failed: rc = %d", rc);
1735                                        force_timer_recalc = 1;
1736                                        spin_lock(&req->rq_lock);
1737                                        req->rq_net_err = 1;
1738                                        spin_unlock(&req->rq_lock);
1739                                        continue;
1740                                }
1741                                /* need to reset the timeout */
1742                                force_timer_recalc = 1;
1743                        }
1744
1745                        spin_lock(&req->rq_lock);
1746
1747                        if (ptlrpc_client_early(req)) {
1748                                ptlrpc_at_recv_early_reply(req);
1749                                spin_unlock(&req->rq_lock);
1750                                continue;
1751                        }
1752
1753                        /* Still waiting for a reply? */
1754                        if (ptlrpc_client_recv(req)) {
1755                                spin_unlock(&req->rq_lock);
1756                                continue;
1757                        }
1758
1759                        /* Did we actually receive a reply? */
1760                        if (!ptlrpc_client_replied(req)) {
1761                                spin_unlock(&req->rq_lock);
1762                                continue;
1763                        }
1764
1765                        spin_unlock(&req->rq_lock);
1766
1767                        /* unlink from net because we are going to
1768                         * swab in-place of reply buffer */
1769                        unregistered = ptlrpc_unregister_reply(req, 1);
1770                        if (!unregistered)
1771                                continue;
1772
1773                        req->rq_status = after_reply(req);
1774                        if (req->rq_resend)
1775                                continue;
1776
1777                        /* If there is no bulk associated with this request,
1778                         * then we're done and should let the interpreter
1779                         * process the reply. Similarly if the RPC returned
1780                         * an error, and therefore the bulk will never arrive.
1781                         */
1782                        if (req->rq_bulk == NULL || req->rq_status < 0) {
1783                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1784                                goto interpret;
1785                        }
1786
1787                        ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1788                }
1789
1790                LASSERT(req->rq_phase == RQ_PHASE_BULK);
1791                if (ptlrpc_client_bulk_active(req))
1792                        continue;
1793
1794                if (req->rq_bulk->bd_failure) {
1795                        /* The RPC reply arrived OK, but the bulk screwed
1796                         * up!  Dead weird since the server told us the RPC
1797                         * was good after getting the REPLY for her GET or
1798                         * the ACK for her PUT. */
1799                        DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1800                        req->rq_status = -EIO;
1801                }
1802
1803                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1804
1805interpret:
1806                LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1807
1808                /* This moves to "unregistering" phase we need to wait for
1809                 * reply unlink. */
1810                if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
1811                        /* start async bulk unlink too */
1812                        ptlrpc_unregister_bulk(req, 1);
1813                        continue;
1814                }
1815
1816                if (!ptlrpc_unregister_bulk(req, 1))
1817                        continue;
1818
1819                /* When calling interpret receiving already should be
1820                 * finished. */
1821                LASSERT(!req->rq_receiving_reply);
1822
1823                ptlrpc_req_interpret(env, req, req->rq_status);
1824
1825                if (ptlrpcd_check_work(req)) {
1826                        atomic_dec(&set->set_remaining);
1827                        continue;
1828                }
1829                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1830
1831                CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0,
1832                       "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1833                       current_comm(), imp->imp_obd->obd_uuid.uuid,
1834                       lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1835                       libcfs_nid2str(imp->imp_connection->c_peer.nid),
1836                       lustre_msg_get_opc(req->rq_reqmsg));
1837
1838                spin_lock(&imp->imp_lock);
1839                /* Request already may be not on sending or delaying list. This
1840                 * may happen in the case of marking it erroneous for the case
1841                 * ptlrpc_import_delay_req(req, status) find it impossible to
1842                 * allow sending this rpc and returns *status != 0. */
1843                if (!list_empty(&req->rq_list)) {
1844                        list_del_init(&req->rq_list);
1845                        atomic_dec(&imp->imp_inflight);
1846                }
1847                spin_unlock(&imp->imp_lock);
1848
1849                atomic_dec(&set->set_remaining);
1850                wake_up_all(&imp->imp_recovery_waitq);
1851
1852                if (set->set_producer) {
1853                        /* produce a new request if possible */
1854                        if (ptlrpc_set_producer(set) > 0)
1855                                force_timer_recalc = 1;
1856
1857                        /* free the request that has just been completed
1858                         * in order not to pollute set->set_requests */
1859                        list_del_init(&req->rq_set_chain);
1860                        spin_lock(&req->rq_lock);
1861                        req->rq_set = NULL;
1862                        req->rq_invalid_rqset = 0;
1863                        spin_unlock(&req->rq_lock);
1864
1865                        /* record rq_status to compute the final status later */
1866                        if (req->rq_status != 0)
1867                                set->set_rc = req->rq_status;
1868                        ptlrpc_req_finished(req);
1869                } else {
1870                        list_move_tail(&req->rq_set_chain, &comp_reqs);
1871                }
1872        }
1873
1874        /* move completed request at the head of list so it's easier for
1875         * caller to find them */
1876        list_splice(&comp_reqs, &set->set_requests);
1877
1878        /* If we hit an error, we want to recover promptly. */
1879        return atomic_read(&set->set_remaining) == 0 || force_timer_recalc;
1880}
1881EXPORT_SYMBOL(ptlrpc_check_set);
1882
1883/**
1884 * Time out request \a req. is \a async_unlink is set, that means do not wait
1885 * until LNet actually confirms network buffer unlinking.
1886 * Return 1 if we should give up further retrying attempts or 0 otherwise.
1887 */
1888int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1889{
1890        struct obd_import *imp = req->rq_import;
1891        int rc = 0;
1892
1893        spin_lock(&req->rq_lock);
1894        req->rq_timedout = 1;
1895        spin_unlock(&req->rq_lock);
1896
1897        DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent "CFS_DURATION_T
1898                  "/real "CFS_DURATION_T"]",
1899                  req->rq_net_err ? "failed due to network error" :
1900                     ((req->rq_real_sent == 0 ||
1901                       time_before((unsigned long)req->rq_real_sent, (unsigned long)req->rq_sent) ||
1902                       cfs_time_aftereq(req->rq_real_sent, req->rq_deadline)) ?
1903                      "timed out for sent delay" : "timed out for slow reply"),
1904                  req->rq_sent, req->rq_real_sent);
1905
1906        if (imp != NULL && obd_debug_peer_on_timeout)
1907                LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
1908
1909        ptlrpc_unregister_reply(req, async_unlink);
1910        ptlrpc_unregister_bulk(req, async_unlink);
1911
1912        if (obd_dump_on_timeout)
1913                libcfs_debug_dumplog();
1914
1915        if (imp == NULL) {
1916                DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1917                return 1;
1918        }
1919
1920        atomic_inc(&imp->imp_timeouts);
1921
1922        /* The DLM server doesn't want recovery run on its imports. */
1923        if (imp->imp_dlm_fake)
1924                return 1;
1925
1926        /* If this request is for recovery or other primordial tasks,
1927         * then error it out here. */
1928        if (req->rq_ctx_init || req->rq_ctx_fini ||
1929            req->rq_send_state != LUSTRE_IMP_FULL ||
1930            imp->imp_obd->obd_no_recov) {
1931                DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1932                          ptlrpc_import_state_name(req->rq_send_state),
1933                          ptlrpc_import_state_name(imp->imp_state));
1934                spin_lock(&req->rq_lock);
1935                req->rq_status = -ETIMEDOUT;
1936                req->rq_err = 1;
1937                spin_unlock(&req->rq_lock);
1938                return 1;
1939        }
1940
1941        /* if a request can't be resent we can't wait for an answer after
1942           the timeout */
1943        if (ptlrpc_no_resend(req)) {
1944                DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1945                rc = 1;
1946        }
1947
1948        ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1949
1950        return rc;
1951}
1952
1953/**
1954 * Time out all uncompleted requests in request set pointed by \a data
1955 * Callback used when waiting on sets with l_wait_event.
1956 * Always returns 1.
1957 */
1958int ptlrpc_expired_set(void *data)
1959{
1960        struct ptlrpc_request_set *set = data;
1961        struct list_head                *tmp;
1962        time_t               now = get_seconds();
1963
1964        LASSERT(set != NULL);
1965
1966        /*
1967         * A timeout expired. See which reqs it applies to...
1968         */
1969        list_for_each(tmp, &set->set_requests) {
1970                struct ptlrpc_request *req =
1971                        list_entry(tmp, struct ptlrpc_request,
1972                                       rq_set_chain);
1973
1974                /* don't expire request waiting for context */
1975                if (req->rq_wait_ctx)
1976                        continue;
1977
1978                /* Request in-flight? */
1979                if (!((req->rq_phase == RQ_PHASE_RPC &&
1980                       !req->rq_waiting && !req->rq_resend) ||
1981                      (req->rq_phase == RQ_PHASE_BULK)))
1982                        continue;
1983
1984                if (req->rq_timedout ||     /* already dealt with */
1985                    req->rq_deadline > now) /* not expired */
1986                        continue;
1987
1988                /* Deal with this guy. Do it asynchronously to not block
1989                 * ptlrpcd thread. */
1990                ptlrpc_expire_one_request(req, 1);
1991        }
1992
1993        /*
1994         * When waiting for a whole set, we always break out of the
1995         * sleep so we can recalculate the timeout, or enable interrupts
1996         * if everyone's timed out.
1997         */
1998        return 1;
1999}
2000EXPORT_SYMBOL(ptlrpc_expired_set);
2001
2002/**
2003 * Sets rq_intr flag in \a req under spinlock.
2004 */
2005void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
2006{
2007        spin_lock(&req->rq_lock);
2008        req->rq_intr = 1;
2009        spin_unlock(&req->rq_lock);
2010}
2011EXPORT_SYMBOL(ptlrpc_mark_interrupted);
2012
2013/**
2014 * Interrupts (sets interrupted flag) all uncompleted requests in
2015 * a set \a data. Callback for l_wait_event for interruptible waits.
2016 */
2017void ptlrpc_interrupted_set(void *data)
2018{
2019        struct ptlrpc_request_set *set = data;
2020        struct list_head *tmp;
2021
2022        LASSERT(set != NULL);
2023        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2024
2025        list_for_each(tmp, &set->set_requests) {
2026                struct ptlrpc_request *req =
2027                        list_entry(tmp, struct ptlrpc_request,
2028                                       rq_set_chain);
2029
2030                if (req->rq_phase != RQ_PHASE_RPC &&
2031                    req->rq_phase != RQ_PHASE_UNREGISTERING)
2032                        continue;
2033
2034                ptlrpc_mark_interrupted(req);
2035        }
2036}
2037EXPORT_SYMBOL(ptlrpc_interrupted_set);
2038
2039/**
2040 * Get the smallest timeout in the set; this does NOT set a timeout.
2041 */
2042int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2043{
2044        struct list_head            *tmp;
2045        time_t           now = get_seconds();
2046        int                 timeout = 0;
2047        struct ptlrpc_request *req;
2048        int                 deadline;
2049
2050        list_for_each(tmp, &set->set_requests) {
2051                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2052
2053                /*
2054                 * Request in-flight?
2055                 */
2056                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2057                      (req->rq_phase == RQ_PHASE_BULK) ||
2058                      (req->rq_phase == RQ_PHASE_NEW)))
2059                        continue;
2060
2061                /*
2062                 * Already timed out.
2063                 */
2064                if (req->rq_timedout)
2065                        continue;
2066
2067                /*
2068                 * Waiting for ctx.
2069                 */
2070                if (req->rq_wait_ctx)
2071                        continue;
2072
2073                if (req->rq_phase == RQ_PHASE_NEW)
2074                        deadline = req->rq_sent;
2075                else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2076                        deadline = req->rq_sent;
2077                else
2078                        deadline = req->rq_sent + req->rq_timeout;
2079
2080                if (deadline <= now)    /* actually expired already */
2081                        timeout = 1;    /* ASAP */
2082                else if (timeout == 0 || timeout > deadline - now)
2083                        timeout = deadline - now;
2084        }
2085        return timeout;
2086}
2087EXPORT_SYMBOL(ptlrpc_set_next_timeout);
2088
2089/**
2090 * Send all unset request from the set and then wait until all
2091 * requests in the set complete (either get a reply, timeout, get an
2092 * error or otherwise be interrupted).
2093 * Returns 0 on success or error code otherwise.
2094 */
2095int ptlrpc_set_wait(struct ptlrpc_request_set *set)
2096{
2097        struct list_head            *tmp;
2098        struct ptlrpc_request *req;
2099        struct l_wait_info     lwi;
2100        int                 rc, timeout;
2101
2102        if (set->set_producer)
2103                (void)ptlrpc_set_producer(set);
2104        else
2105                list_for_each(tmp, &set->set_requests) {
2106                        req = list_entry(tmp, struct ptlrpc_request,
2107                                             rq_set_chain);
2108                        if (req->rq_phase == RQ_PHASE_NEW)
2109                                (void)ptlrpc_send_new_req(req);
2110                }
2111
2112        if (list_empty(&set->set_requests))
2113                return 0;
2114
2115        do {
2116                timeout = ptlrpc_set_next_timeout(set);
2117
2118                /* wait until all complete, interrupted, or an in-flight
2119                 * req times out */
2120                CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
2121                       set, timeout);
2122
2123                if (timeout == 0 && !cfs_signal_pending())
2124                        /*
2125                         * No requests are in-flight (ether timed out
2126                         * or delayed), so we can allow interrupts.
2127                         * We still want to block for a limited time,
2128                         * so we allow interrupts during the timeout.
2129                         */
2130                        lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
2131                                                   ptlrpc_expired_set,
2132                                                   ptlrpc_interrupted_set, set);
2133                else
2134                        /*
2135                         * At least one request is in flight, so no
2136                         * interrupts are allowed. Wait until all
2137                         * complete, or an in-flight req times out.
2138                         */
2139                        lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2140                                          ptlrpc_expired_set, set);
2141
2142                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
2143
2144                /* LU-769 - if we ignored the signal because it was already
2145                 * pending when we started, we need to handle it now or we risk
2146                 * it being ignored forever */
2147                if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
2148                    cfs_signal_pending()) {
2149                        sigset_t blocked_sigs =
2150                                           cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2151
2152                        /* In fact we only interrupt for the "fatal" signals
2153                         * like SIGINT or SIGKILL. We still ignore less
2154                         * important signals since ptlrpc set is not easily
2155                         * reentrant from userspace again */
2156                        if (cfs_signal_pending())
2157                                ptlrpc_interrupted_set(set);
2158                        cfs_restore_sigs(blocked_sigs);
2159                }
2160
2161                LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2162
2163                /* -EINTR => all requests have been flagged rq_intr so next
2164                 * check completes.
2165                 * -ETIMEDOUT => someone timed out.  When all reqs have
2166                 * timed out, signals are enabled allowing completion with
2167                 * EINTR.
2168                 * I don't really care if we go once more round the loop in
2169                 * the error cases -eeb. */
2170                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2171                        list_for_each(tmp, &set->set_requests) {
2172                                req = list_entry(tmp, struct ptlrpc_request,
2173                                                     rq_set_chain);
2174                                spin_lock(&req->rq_lock);
2175                                req->rq_invalid_rqset = 1;
2176                                spin_unlock(&req->rq_lock);
2177                        }
2178                }
2179        } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2180
2181        LASSERT(atomic_read(&set->set_remaining) == 0);
2182
2183        rc = set->set_rc; /* rq_status of already freed requests if any */
2184        list_for_each(tmp, &set->set_requests) {
2185                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2186
2187                LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2188                if (req->rq_status != 0)
2189                        rc = req->rq_status;
2190        }
2191
2192        if (set->set_interpret != NULL) {
2193                int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
2194                        set->set_interpret;
2195                rc = interpreter (set, set->set_arg, rc);
2196        } else {
2197                struct ptlrpc_set_cbdata *cbdata, *n;
2198                int err;
2199
2200                list_for_each_entry_safe(cbdata, n,
2201                                         &set->set_cblist, psc_item) {
2202                        list_del_init(&cbdata->psc_item);
2203                        err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
2204                        if (err && !rc)
2205                                rc = err;
2206                        OBD_FREE_PTR(cbdata);
2207                }
2208        }
2209
2210        return rc;
2211}
2212EXPORT_SYMBOL(ptlrpc_set_wait);
2213
2214/**
2215 * Helper function for request freeing.
2216 * Called when request count reached zero and request needs to be freed.
2217 * Removes request from all sorts of sending/replay lists it might be on,
2218 * frees network buffers if any are present.
2219 * If \a locked is set, that means caller is already holding import imp_lock
2220 * and so we no longer need to reobtain it (for certain lists manipulations)
2221 */
2222static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2223{
2224        if (request == NULL)
2225                return;
2226        LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2227        LASSERTF(request->rq_rqbd == NULL, "req %p\n", request);/* client-side */
2228        LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2229        LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2230        LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
2231        LASSERTF(!request->rq_replay, "req %p\n", request);
2232
2233        req_capsule_fini(&request->rq_pill);
2234
2235        /* We must take it off the imp_replay_list first.  Otherwise, we'll set
2236         * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
2237        if (request->rq_import != NULL) {
2238                if (!locked)
2239                        spin_lock(&request->rq_import->imp_lock);
2240                list_del_init(&request->rq_replay_list);
2241                if (!locked)
2242                        spin_unlock(&request->rq_import->imp_lock);
2243        }
2244        LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2245
2246        if (atomic_read(&request->rq_refcount) != 0) {
2247                DEBUG_REQ(D_ERROR, request,
2248                          "freeing request with nonzero refcount");
2249                LBUG();
2250        }
2251
2252        if (request->rq_repbuf != NULL)
2253                sptlrpc_cli_free_repbuf(request);
2254        if (request->rq_export != NULL) {
2255                class_export_put(request->rq_export);
2256                request->rq_export = NULL;
2257        }
2258        if (request->rq_import != NULL) {
2259                class_import_put(request->rq_import);
2260                request->rq_import = NULL;
2261        }
2262        if (request->rq_bulk != NULL)
2263                ptlrpc_free_bulk_pin(request->rq_bulk);
2264
2265        if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
2266                sptlrpc_cli_free_reqbuf(request);
2267
2268        if (request->rq_cli_ctx)
2269                sptlrpc_req_put_ctx(request, !locked);
2270
2271        if (request->rq_pool)
2272                __ptlrpc_free_req_to_pool(request);
2273        else
2274                ptlrpc_request_cache_free(request);
2275}
2276
2277static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
2278/**
2279 * Drop one request reference. Must be called with import imp_lock held.
2280 * When reference count drops to zero, request is freed.
2281 */
2282void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
2283{
2284        assert_spin_locked(&request->rq_import->imp_lock);
2285        (void)__ptlrpc_req_finished(request, 1);
2286}
2287EXPORT_SYMBOL(ptlrpc_req_finished_with_imp_lock);
2288
2289/**
2290 * Helper function
2291 * Drops one reference count for request \a request.
2292 * \a locked set indicates that caller holds import imp_lock.
2293 * Frees the request when reference count reaches zero.
2294 */
2295static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2296{
2297        if (request == NULL)
2298                return 1;
2299
2300        if (request == LP_POISON ||
2301            request->rq_reqmsg == LP_POISON) {
2302                CERROR("dereferencing freed request (bug 575)\n");
2303                LBUG();
2304                return 1;
2305        }
2306
2307        DEBUG_REQ(D_INFO, request, "refcount now %u",
2308                  atomic_read(&request->rq_refcount) - 1);
2309
2310        if (atomic_dec_and_test(&request->rq_refcount)) {
2311                __ptlrpc_free_req(request, locked);
2312                return 1;
2313        }
2314
2315        return 0;
2316}
2317
2318/**
2319 * Drops one reference count for a request.
2320 */
2321void ptlrpc_req_finished(struct ptlrpc_request *request)
2322{
2323        __ptlrpc_req_finished(request, 0);
2324}
2325EXPORT_SYMBOL(ptlrpc_req_finished);
2326
2327/**
2328 * Returns xid of a \a request
2329 */
2330__u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2331{
2332        return request->rq_xid;
2333}
2334EXPORT_SYMBOL(ptlrpc_req_xid);
2335
2336/**
2337 * Disengage the client's reply buffer from the network
2338 * NB does _NOT_ unregister any client-side bulk.
2339 * IDEMPOTENT, but _not_ safe against concurrent callers.
2340 * The request owner (i.e. the thread doing the I/O) must call...
2341 * Returns 0 on success or 1 if unregistering cannot be made.
2342 */
2343int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2344{
2345        int             rc;
2346        wait_queue_head_t       *wq;
2347        struct l_wait_info lwi;
2348
2349        /*
2350         * Might sleep.
2351         */
2352        LASSERT(!in_interrupt());
2353
2354        /*
2355         * Let's setup deadline for reply unlink.
2356         */
2357        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2358            async && request->rq_reply_deadline == 0)
2359                request->rq_reply_deadline = get_seconds()+LONG_UNLINK;
2360
2361        /*
2362         * Nothing left to do.
2363         */
2364        if (!ptlrpc_client_recv_or_unlink(request))
2365                return 1;
2366
2367        LNetMDUnlink(request->rq_reply_md_h);
2368
2369        /*
2370         * Let's check it once again.
2371         */
2372        if (!ptlrpc_client_recv_or_unlink(request))
2373                return 1;
2374
2375        /*
2376         * Move to "Unregistering" phase as reply was not unlinked yet.
2377         */
2378        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
2379
2380        /*
2381         * Do not wait for unlink to finish.
2382         */
2383        if (async)
2384                return 0;
2385
2386        /*
2387         * We have to l_wait_event() whatever the result, to give liblustre
2388         * a chance to run reply_in_callback(), and to make sure we've
2389         * unlinked before returning a req to the pool.
2390         */
2391        if (request->rq_set != NULL)
2392                wq = &request->rq_set->set_waitq;
2393        else
2394                wq = &request->rq_reply_waitq;
2395
2396        for (;;) {
2397                /* Network access will complete in finite time but the HUGE
2398                 * timeout lets us CWARN for visibility of sluggish NALs */
2399                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2400                                           cfs_time_seconds(1), NULL, NULL);
2401                rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2402                                  &lwi);
2403                if (rc == 0) {
2404                        ptlrpc_rqphase_move(request, request->rq_next_phase);
2405                        return 1;
2406                }
2407
2408                LASSERT(rc == -ETIMEDOUT);
2409                DEBUG_REQ(D_WARNING, request,
2410                          "Unexpectedly long timeout rvcng=%d unlnk=%d/%d",
2411                          request->rq_receiving_reply,
2412                          request->rq_req_unlink, request->rq_reply_unlink);
2413        }
2414        return 0;
2415}
2416EXPORT_SYMBOL(ptlrpc_unregister_reply);
2417
2418static void ptlrpc_free_request(struct ptlrpc_request *req)
2419{
2420        spin_lock(&req->rq_lock);
2421        req->rq_replay = 0;
2422        spin_unlock(&req->rq_lock);
2423
2424        if (req->rq_commit_cb != NULL)
2425                req->rq_commit_cb(req);
2426        list_del_init(&req->rq_replay_list);
2427
2428        __ptlrpc_req_finished(req, 1);
2429}
2430
2431/**
2432 * the request is committed and dropped from the replay list of its import
2433 */
2434void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2435{
2436        struct obd_import       *imp = req->rq_import;
2437
2438        spin_lock(&imp->imp_lock);
2439        if (list_empty(&req->rq_replay_list)) {
2440                spin_unlock(&imp->imp_lock);
2441                return;
2442        }
2443
2444        if (force || req->rq_transno <= imp->imp_peer_committed_transno)
2445                ptlrpc_free_request(req);
2446
2447        spin_unlock(&imp->imp_lock);
2448}
2449EXPORT_SYMBOL(ptlrpc_request_committed);
2450
2451/**
2452 * Iterates through replay_list on import and prunes
2453 * all requests have transno smaller than last_committed for the
2454 * import and don't have rq_replay set.
2455 * Since requests are sorted in transno order, stops when meeting first
2456 * transno bigger than last_committed.
2457 * caller must hold imp->imp_lock
2458 */
2459void ptlrpc_free_committed(struct obd_import *imp)
2460{
2461        struct ptlrpc_request *req, *saved;
2462        struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
2463        bool                   skip_committed_list = true;
2464
2465        LASSERT(imp != NULL);
2466        assert_spin_locked(&imp->imp_lock);
2467
2468        if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2469            imp->imp_generation == imp->imp_last_generation_checked) {
2470                CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2471                       imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2472                return;
2473        }
2474        CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2475               imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2476               imp->imp_generation);
2477
2478        if (imp->imp_generation != imp->imp_last_generation_checked)
2479                skip_committed_list = false;
2480
2481        imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2482        imp->imp_last_generation_checked = imp->imp_generation;
2483
2484        list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2485                                 rq_replay_list) {
2486                /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
2487                LASSERT(req != last_req);
2488                last_req = req;
2489
2490                if (req->rq_transno == 0) {
2491                        DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2492                        LBUG();
2493                }
2494                if (req->rq_import_generation < imp->imp_generation) {
2495                        DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2496                        goto free_req;
2497                }
2498
2499                /* not yet committed */
2500                if (req->rq_transno > imp->imp_peer_committed_transno) {
2501                        DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2502                        break;
2503                }
2504
2505                if (req->rq_replay) {
2506                        DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2507                        list_move_tail(&req->rq_replay_list,
2508                                       &imp->imp_committed_list);
2509                        continue;
2510                }
2511
2512                DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2513                          imp->imp_peer_committed_transno);
2514free_req:
2515                ptlrpc_free_request(req);
2516        }
2517        if (skip_committed_list)
2518                return;
2519
2520        list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2521                                 rq_replay_list) {
2522                LASSERT(req->rq_transno != 0);
2523                if (req->rq_import_generation < imp->imp_generation) {
2524                        DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
2525                        ptlrpc_free_request(req);
2526                }
2527        }
2528}
2529
2530void ptlrpc_cleanup_client(struct obd_import *imp)
2531{
2532}
2533EXPORT_SYMBOL(ptlrpc_cleanup_client);
2534
2535/**
2536 * Schedule previously sent request for resend.
2537 * For bulk requests we assign new xid (to avoid problems with
2538 * lost replies and therefore several transfers landing into same buffer
2539 * from different sending attempts).
2540 */
2541void ptlrpc_resend_req(struct ptlrpc_request *req)
2542{
2543        DEBUG_REQ(D_HA, req, "going to resend");
2544        spin_lock(&req->rq_lock);
2545
2546        /* Request got reply but linked to the import list still.
2547           Let ptlrpc_check_set() to process it. */
2548        if (ptlrpc_client_replied(req)) {
2549                spin_unlock(&req->rq_lock);
2550                DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2551                return;
2552        }
2553
2554        lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
2555        req->rq_status = -EAGAIN;
2556
2557        req->rq_resend = 1;
2558        req->rq_net_err = 0;
2559        req->rq_timedout = 0;
2560        if (req->rq_bulk) {
2561                __u64 old_xid = req->rq_xid;
2562
2563                /* ensure previous bulk fails */
2564                req->rq_xid = ptlrpc_next_xid();
2565                CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
2566                       old_xid, req->rq_xid);
2567        }
2568        ptlrpc_client_wake_req(req);
2569        spin_unlock(&req->rq_lock);
2570}
2571EXPORT_SYMBOL(ptlrpc_resend_req);
2572
2573/* XXX: this function and rq_status are currently unused */
2574void ptlrpc_restart_req(struct ptlrpc_request *req)
2575{
2576        DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
2577        req->rq_status = -ERESTARTSYS;
2578
2579        spin_lock(&req->rq_lock);
2580        req->rq_restart = 1;
2581        req->rq_timedout = 0;
2582        ptlrpc_client_wake_req(req);
2583        spin_unlock(&req->rq_lock);
2584}
2585EXPORT_SYMBOL(ptlrpc_restart_req);
2586
2587/**
2588 * Grab additional reference on a request \a req
2589 */
2590struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2591{
2592        atomic_inc(&req->rq_refcount);
2593        return req;
2594}
2595EXPORT_SYMBOL(ptlrpc_request_addref);
2596
2597/**
2598 * Add a request to import replay_list.
2599 * Must be called under imp_lock
2600 */
2601void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2602                                      struct obd_import *imp)
2603{
2604        struct list_head *tmp;
2605
2606        assert_spin_locked(&imp->imp_lock);
2607
2608        if (req->rq_transno == 0) {
2609                DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2610                LBUG();
2611        }
2612
2613        /* clear this for new requests that were resent as well
2614           as resent replayed requests. */
2615        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2616
2617        /* don't re-add requests that have been replayed */
2618        if (!list_empty(&req->rq_replay_list))
2619                return;
2620
2621        lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2622
2623        LASSERT(imp->imp_replayable);
2624        /* Balanced in ptlrpc_free_committed, usually. */
2625        ptlrpc_request_addref(req);
2626        list_for_each_prev(tmp, &imp->imp_replay_list) {
2627                struct ptlrpc_request *iter =
2628                        list_entry(tmp, struct ptlrpc_request,
2629                                       rq_replay_list);
2630
2631                /* We may have duplicate transnos if we create and then
2632                 * open a file, or for closes retained if to match creating
2633                 * opens, so use req->rq_xid as a secondary key.
2634                 * (See bugs 684, 685, and 428.)
2635                 * XXX no longer needed, but all opens need transnos!
2636                 */
2637                if (iter->rq_transno > req->rq_transno)
2638                        continue;
2639
2640                if (iter->rq_transno == req->rq_transno) {
2641                        LASSERT(iter->rq_xid != req->rq_xid);
2642                        if (iter->rq_xid > req->rq_xid)
2643                                continue;
2644                }
2645
2646                list_add(&req->rq_replay_list, &iter->rq_replay_list);
2647                return;
2648        }
2649
2650        list_add(&req->rq_replay_list, &imp->imp_replay_list);
2651}
2652EXPORT_SYMBOL(ptlrpc_retain_replayable_request);
2653
2654/**
2655 * Send request and wait until it completes.
2656 * Returns request processing status.
2657 */
2658int ptlrpc_queue_wait(struct ptlrpc_request *req)
2659{
2660        struct ptlrpc_request_set *set;
2661        int rc;
2662
2663        LASSERT(req->rq_set == NULL);
2664        LASSERT(!req->rq_receiving_reply);
2665
2666        set = ptlrpc_prep_set();
2667        if (set == NULL) {
2668                CERROR("Unable to allocate ptlrpc set.");
2669                return -ENOMEM;
2670        }
2671
2672        /* for distributed debugging */
2673        lustre_msg_set_status(req->rq_reqmsg, current_pid());
2674
2675        /* add a ref for the set (see comment in ptlrpc_set_add_req) */
2676        ptlrpc_request_addref(req);
2677        ptlrpc_set_add_req(set, req);
2678        rc = ptlrpc_set_wait(set);
2679        ptlrpc_set_destroy(set);
2680
2681        return rc;
2682}
2683EXPORT_SYMBOL(ptlrpc_queue_wait);
2684
2685struct ptlrpc_replay_async_args {
2686        int praa_old_state;
2687        int praa_old_status;
2688};
2689
2690/**
2691 * Callback used for replayed requests reply processing.
2692 * In case of successful reply calls registered request replay callback.
2693 * In case of error restart replay process.
2694 */
2695static int ptlrpc_replay_interpret(const struct lu_env *env,
2696                                   struct ptlrpc_request *req,
2697                                   void *data, int rc)
2698{
2699        struct ptlrpc_replay_async_args *aa = data;
2700        struct obd_import *imp = req->rq_import;
2701
2702        atomic_dec(&imp->imp_replay_inflight);
2703
2704        if (!ptlrpc_client_replied(req)) {
2705                CERROR("request replay timed out, restarting recovery\n");
2706                rc = -ETIMEDOUT;
2707                goto out;
2708        }
2709
2710        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2711            (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2712             lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) {
2713                rc = lustre_msg_get_status(req->rq_repmsg);
2714                goto out;
2715        }
2716
2717        /** VBR: check version failure */
2718        if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2719                /** replay was failed due to version mismatch */
2720                DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2721                spin_lock(&imp->imp_lock);
2722                imp->imp_vbr_failed = 1;
2723                imp->imp_no_lock_replay = 1;
2724                spin_unlock(&imp->imp_lock);
2725                lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2726        } else {
2727                /** The transno had better not change over replay. */
2728                LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
2729                         lustre_msg_get_transno(req->rq_repmsg) ||
2730                         lustre_msg_get_transno(req->rq_repmsg) == 0,
2731                         "%#llx/%#llx\n",
2732                         lustre_msg_get_transno(req->rq_reqmsg),
2733                         lustre_msg_get_transno(req->rq_repmsg));
2734        }
2735
2736        spin_lock(&imp->imp_lock);
2737        /** if replays by version then gap occur on server, no trust to locks */
2738        if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
2739                imp->imp_no_lock_replay = 1;
2740        imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2741        spin_unlock(&imp->imp_lock);
2742        LASSERT(imp->imp_last_replay_transno);
2743
2744        /* transaction number shouldn't be bigger than the latest replayed */
2745        if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
2746                DEBUG_REQ(D_ERROR, req,
2747                          "Reported transno %llu is bigger than the replayed one: %llu",
2748                          req->rq_transno,
2749                          lustre_msg_get_transno(req->rq_reqmsg));
2750                rc = -EINVAL;
2751                goto out;
2752        }
2753
2754        DEBUG_REQ(D_HA, req, "got rep");
2755
2756        /* let the callback do fixups, possibly including in the request */
2757        if (req->rq_replay_cb)
2758                req->rq_replay_cb(req);
2759
2760        if (ptlrpc_client_replied(req) &&
2761            lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2762                DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2763                          lustre_msg_get_status(req->rq_repmsg),
2764                          aa->praa_old_status);
2765        } else {
2766                /* Put it back for re-replay. */
2767                lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2768        }
2769
2770        /*
2771         * Errors while replay can set transno to 0, but
2772         * imp_last_replay_transno shouldn't be set to 0 anyway
2773         */
2774        if (req->rq_transno == 0)
2775                CERROR("Transno is 0 during replay!\n");
2776
2777        /* continue with recovery */
2778        rc = ptlrpc_import_recovery_state_machine(imp);
2779 out:
2780        req->rq_send_state = aa->praa_old_state;
2781
2782        if (rc != 0)
2783                /* this replay failed, so restart recovery */
2784                ptlrpc_connect_import(imp);
2785
2786        return rc;
2787}
2788
2789/**
2790 * Prepares and queues request for replay.
2791 * Adds it to ptlrpcd queue for actual sending.
2792 * Returns 0 on success.
2793 */
2794int ptlrpc_replay_req(struct ptlrpc_request *req)
2795{
2796        struct ptlrpc_replay_async_args *aa;
2797
2798        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2799
2800        LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2801        aa = ptlrpc_req_async_args(req);
2802        memset(aa, 0, sizeof(*aa));
2803
2804        /* Prepare request to be resent with ptlrpcd */
2805        aa->praa_old_state = req->rq_send_state;
2806        req->rq_send_state = LUSTRE_IMP_REPLAY;
2807        req->rq_phase = RQ_PHASE_NEW;
2808        req->rq_next_phase = RQ_PHASE_UNDEFINED;
2809        if (req->rq_repmsg)
2810                aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2811        req->rq_status = 0;
2812        req->rq_interpret_reply = ptlrpc_replay_interpret;
2813        /* Readjust the timeout for current conditions */
2814        ptlrpc_at_set_req_timeout(req);
2815
2816        /* Tell server the net_latency, so the server can calculate how long
2817         * it should wait for next replay */
2818        lustre_msg_set_service_time(req->rq_reqmsg,
2819                                    ptlrpc_at_get_net_latency(req));
2820        DEBUG_REQ(D_HA, req, "REPLAY");
2821
2822        atomic_inc(&req->rq_import->imp_replay_inflight);
2823        ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2824
2825        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2826        return 0;
2827}
2828EXPORT_SYMBOL(ptlrpc_replay_req);
2829
2830/**
2831 * Aborts all in-flight request on import \a imp sending and delayed lists
2832 */
2833void ptlrpc_abort_inflight(struct obd_import *imp)
2834{
2835        struct list_head *tmp, *n;
2836
2837        /* Make sure that no new requests get processed for this import.
2838         * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2839         * this flag and then putting requests on sending_list or delayed_list.
2840         */
2841        spin_lock(&imp->imp_lock);
2842
2843        /* XXX locking?  Maybe we should remove each request with the list
2844         * locked?  Also, how do we know if the requests on the list are
2845         * being freed at this time?
2846         */
2847        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2848                struct ptlrpc_request *req =
2849                        list_entry(tmp, struct ptlrpc_request, rq_list);
2850
2851                DEBUG_REQ(D_RPCTRACE, req, "inflight");
2852
2853                spin_lock(&req->rq_lock);
2854                if (req->rq_import_generation < imp->imp_generation) {
2855                        req->rq_err = 1;
2856                        req->rq_status = -EIO;
2857                        ptlrpc_client_wake_req(req);
2858                }
2859                spin_unlock(&req->rq_lock);
2860        }
2861
2862        list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2863                struct ptlrpc_request *req =
2864                        list_entry(tmp, struct ptlrpc_request, rq_list);
2865
2866                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2867
2868                spin_lock(&req->rq_lock);
2869                if (req->rq_import_generation < imp->imp_generation) {
2870                        req->rq_err = 1;
2871                        req->rq_status = -EIO;
2872                        ptlrpc_client_wake_req(req);
2873                }
2874                spin_unlock(&req->rq_lock);
2875        }
2876
2877        /* Last chance to free reqs left on the replay list, but we
2878         * will still leak reqs that haven't committed.  */
2879        if (imp->imp_replayable)
2880                ptlrpc_free_committed(imp);
2881
2882        spin_unlock(&imp->imp_lock);
2883}
2884EXPORT_SYMBOL(ptlrpc_abort_inflight);
2885
2886/**
2887 * Abort all uncompleted requests in request set \a set
2888 */
2889void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2890{
2891        struct list_head *tmp, *pos;
2892
2893        LASSERT(set != NULL);
2894
2895        list_for_each_safe(pos, tmp, &set->set_requests) {
2896                struct ptlrpc_request *req =
2897                        list_entry(pos, struct ptlrpc_request,
2898                                       rq_set_chain);
2899
2900                spin_lock(&req->rq_lock);
2901                if (req->rq_phase != RQ_PHASE_RPC) {
2902                        spin_unlock(&req->rq_lock);
2903                        continue;
2904                }
2905
2906                req->rq_err = 1;
2907                req->rq_status = -EINTR;
2908                ptlrpc_client_wake_req(req);
2909                spin_unlock(&req->rq_lock);
2910        }
2911}
2912
2913static __u64 ptlrpc_last_xid;
2914static spinlock_t ptlrpc_last_xid_lock;
2915
2916/**
2917 * Initialize the XID for the node.  This is common among all requests on
2918 * this node, and only requires the property that it is monotonically
2919 * increasing.  It does not need to be sequential.  Since this is also used
2920 * as the RDMA match bits, it is important that a single client NOT have
2921 * the same match bits for two different in-flight requests, hence we do
2922 * NOT want to have an XID per target or similar.
2923 *
2924 * To avoid an unlikely collision between match bits after a client reboot
2925 * (which would deliver old data into the wrong RDMA buffer) initialize
2926 * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
2927 * If the time is clearly incorrect, we instead use a 62-bit random number.
2928 * In the worst case the random number will overflow 1M RPCs per second in
2929 * 9133 years, or permutations thereof.
2930 */
2931#define YEAR_2004 (1ULL << 30)
2932void ptlrpc_init_xid(void)
2933{
2934        time_t now = get_seconds();
2935
2936        spin_lock_init(&ptlrpc_last_xid_lock);
2937        if (now < YEAR_2004) {
2938                cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2939                ptlrpc_last_xid >>= 2;
2940                ptlrpc_last_xid |= (1ULL << 61);
2941        } else {
2942                ptlrpc_last_xid = (__u64)now << 20;
2943        }
2944
2945        /* Always need to be aligned to a power-of-two for multi-bulk BRW */
2946        CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
2947        ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
2948}
2949
2950/**
2951 * Increase xid and returns resulting new value to the caller.
2952 *
2953 * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
2954 * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
2955 * itself uses the last bulk xid needed, so the server can determine the
2956 * the number of bulk transfers from the RPC XID and a bitmask.  The starting
2957 * xid must align to a power-of-two value.
2958 *
2959 * This is assumed to be true due to the initial ptlrpc_last_xid
2960 * value also being initialized to a power-of-two value. LU-1431
2961 */
2962__u64 ptlrpc_next_xid(void)
2963{
2964        __u64 next;
2965
2966        spin_lock(&ptlrpc_last_xid_lock);
2967        next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2968        ptlrpc_last_xid = next;
2969        spin_unlock(&ptlrpc_last_xid_lock);
2970
2971        return next;
2972}
2973EXPORT_SYMBOL(ptlrpc_next_xid);
2974
2975/**
2976 * Get a glimpse at what next xid value might have been.
2977 * Returns possible next xid.
2978 */
2979__u64 ptlrpc_sample_next_xid(void)
2980{
2981#if BITS_PER_LONG == 32
2982        /* need to avoid possible word tearing on 32-bit systems */
2983        __u64 next;
2984
2985        spin_lock(&ptlrpc_last_xid_lock);
2986        next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2987        spin_unlock(&ptlrpc_last_xid_lock);
2988
2989        return next;
2990#else
2991        /* No need to lock, since returned value is racy anyways */
2992        return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2993#endif
2994}
2995EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2996
2997/**
2998 * Functions for operating ptlrpc workers.
2999 *
3000 * A ptlrpc work is a function which will be running inside ptlrpc context.
3001 * The callback shouldn't sleep otherwise it will block that ptlrpcd thread.
3002 *
3003 * 1. after a work is created, it can be used many times, that is:
3004 *       handler = ptlrpcd_alloc_work();
3005 *       ptlrpcd_queue_work();
3006 *
3007 *    queue it again when necessary:
3008 *       ptlrpcd_queue_work();
3009 *       ptlrpcd_destroy_work();
3010 * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but
3011 *    it will only be queued once in any time. Also as its name implies, it may
3012 *    have delay before it really runs by ptlrpcd thread.
3013 */
3014struct ptlrpc_work_async_args {
3015        int   (*cb)(const struct lu_env *, void *);
3016        void   *cbdata;
3017};
3018
3019static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
3020{
3021        /* re-initialize the req */
3022        req->rq_timeout         = obd_timeout;
3023        req->rq_sent            = get_seconds();
3024        req->rq_deadline        = req->rq_sent + req->rq_timeout;
3025        req->rq_reply_deadline  = req->rq_deadline;
3026        req->rq_phase           = RQ_PHASE_INTERPRET;
3027        req->rq_next_phase      = RQ_PHASE_COMPLETE;
3028        req->rq_xid             = ptlrpc_next_xid();
3029        req->rq_import_generation = req->rq_import->imp_generation;
3030
3031        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3032}
3033
3034static int work_interpreter(const struct lu_env *env,
3035                            struct ptlrpc_request *req, void *data, int rc)
3036{
3037        struct ptlrpc_work_async_args *arg = data;
3038
3039        LASSERT(ptlrpcd_check_work(req));
3040        LASSERT(arg->cb != NULL);
3041
3042        rc = arg->cb(env, arg->cbdata);
3043
3044        list_del_init(&req->rq_set_chain);
3045        req->rq_set = NULL;
3046
3047        if (atomic_dec_return(&req->rq_refcount) > 1) {
3048                atomic_set(&req->rq_refcount, 2);
3049                ptlrpcd_add_work_req(req);
3050        }
3051        return rc;
3052}
3053
3054static int worker_format;
3055
3056static int ptlrpcd_check_work(struct ptlrpc_request *req)
3057{
3058        return req->rq_pill.rc_fmt == (void *)&worker_format;
3059}
3060
3061/**
3062 * Create a work for ptlrpc.
3063 */
3064void *ptlrpcd_alloc_work(struct obd_import *imp,
3065                         int (*cb)(const struct lu_env *, void *), void *cbdata)
3066{
3067        struct ptlrpc_request    *req = NULL;
3068        struct ptlrpc_work_async_args *args;
3069
3070        might_sleep();
3071
3072        if (cb == NULL)
3073                return ERR_PTR(-EINVAL);
3074
3075        /* copy some code from deprecated fakereq. */
3076        req = ptlrpc_request_cache_alloc(GFP_NOFS);
3077        if (req == NULL) {
3078                CERROR("ptlrpc: run out of memory!\n");
3079                return ERR_PTR(-ENOMEM);
3080        }
3081
3082        req->rq_send_state = LUSTRE_IMP_FULL;
3083        req->rq_type = PTL_RPC_MSG_REQUEST;
3084        req->rq_import = class_import_get(imp);
3085        req->rq_export = NULL;
3086        req->rq_interpret_reply = work_interpreter;
3087        /* don't want reply */
3088        req->rq_receiving_reply = 0;
3089        req->rq_req_unlink = req->rq_reply_unlink = 0;
3090        req->rq_no_delay = req->rq_no_resend = 1;
3091        req->rq_pill.rc_fmt = (void *)&worker_format;
3092
3093        spin_lock_init(&req->rq_lock);
3094        INIT_LIST_HEAD(&req->rq_list);
3095        INIT_LIST_HEAD(&req->rq_replay_list);
3096        INIT_LIST_HEAD(&req->rq_set_chain);
3097        INIT_LIST_HEAD(&req->rq_history_list);
3098        INIT_LIST_HEAD(&req->rq_exp_list);
3099        init_waitqueue_head(&req->rq_reply_waitq);
3100        init_waitqueue_head(&req->rq_set_waitq);
3101        atomic_set(&req->rq_refcount, 1);
3102
3103        CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
3104        args = ptlrpc_req_async_args(req);
3105        args->cb     = cb;
3106        args->cbdata = cbdata;
3107
3108        return req;
3109}
3110EXPORT_SYMBOL(ptlrpcd_alloc_work);
3111
3112void ptlrpcd_destroy_work(void *handler)
3113{
3114        struct ptlrpc_request *req = handler;
3115
3116        if (req)
3117                ptlrpc_req_finished(req);
3118}
3119EXPORT_SYMBOL(ptlrpcd_destroy_work);
3120
3121int ptlrpcd_queue_work(void *handler)
3122{
3123        struct ptlrpc_request *req = handler;
3124
3125        /*
3126         * Check if the req is already being queued.
3127         *
3128         * Here comes a trick: it lacks a way of checking if a req is being
3129         * processed reliably in ptlrpc. Here I have to use refcount of req
3130         * for this purpose. This is okay because the caller should use this
3131         * req as opaque data. - Jinshan
3132         */
3133        LASSERT(atomic_read(&req->rq_refcount) > 0);
3134        if (atomic_inc_return(&req->rq_refcount) == 2)
3135                ptlrpcd_add_work_req(req);
3136        return 0;
3137}
3138EXPORT_SYMBOL(ptlrpcd_queue_work);
3139