linux/drivers/staging/lustre/lustre/mdc/mdc_locks.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33#define DEBUG_SUBSYSTEM S_MDC
  34
  35# include <linux/module.h>
  36
  37#include "../include/lustre_intent.h"
  38#include "../include/obd.h"
  39#include "../include/obd_class.h"
  40#include "../include/lustre_dlm.h"
  41#include "../include/lustre_fid.h"
  42#include "../include/lustre_mdc.h"
  43#include "../include/lustre_net.h"
  44#include "../include/lustre_req_layout.h"
  45#include "../include/lustre_swab.h"
  46
  47#include "mdc_internal.h"
  48
  49struct mdc_getattr_args {
  50        struct obd_export          *ga_exp;
  51        struct md_enqueue_info      *ga_minfo;
  52};
  53
  54int it_open_error(int phase, struct lookup_intent *it)
  55{
  56        if (it_disposition(it, DISP_OPEN_LEASE)) {
  57                if (phase >= DISP_OPEN_LEASE)
  58                        return it->it_status;
  59                else
  60                        return 0;
  61        }
  62        if (it_disposition(it, DISP_OPEN_OPEN)) {
  63                if (phase >= DISP_OPEN_OPEN)
  64                        return it->it_status;
  65                else
  66                        return 0;
  67        }
  68
  69        if (it_disposition(it, DISP_OPEN_CREATE)) {
  70                if (phase >= DISP_OPEN_CREATE)
  71                        return it->it_status;
  72                else
  73                        return 0;
  74        }
  75
  76        if (it_disposition(it, DISP_LOOKUP_EXECD)) {
  77                if (phase >= DISP_LOOKUP_EXECD)
  78                        return it->it_status;
  79                else
  80                        return 0;
  81        }
  82
  83        if (it_disposition(it, DISP_IT_EXECD)) {
  84                if (phase >= DISP_IT_EXECD)
  85                        return it->it_status;
  86                else
  87                        return 0;
  88        }
  89        CERROR("it disp: %X, status: %d\n", it->it_disposition,
  90               it->it_status);
  91        LBUG();
  92        return 0;
  93}
  94EXPORT_SYMBOL(it_open_error);
  95
  96/* this must be called on a lockh that is known to have a referenced lock */
  97int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
  98                      void *data, __u64 *bits)
  99{
 100        struct ldlm_lock *lock;
 101        struct inode *new_inode = data;
 102
 103        if (bits)
 104                *bits = 0;
 105
 106        if (!lustre_handle_is_used(lockh))
 107                return 0;
 108
 109        lock = ldlm_handle2lock(lockh);
 110
 111        LASSERT(lock);
 112        lock_res_and_lock(lock);
 113        if (lock->l_resource->lr_lvb_inode &&
 114            lock->l_resource->lr_lvb_inode != data) {
 115                struct inode *old_inode = lock->l_resource->lr_lvb_inode;
 116
 117                LASSERTF(old_inode->i_state & I_FREEING,
 118                         "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
 119                         old_inode, old_inode->i_ino, old_inode->i_generation,
 120                         old_inode->i_state, new_inode, new_inode->i_ino,
 121                         new_inode->i_generation);
 122        }
 123        lock->l_resource->lr_lvb_inode = new_inode;
 124        if (bits)
 125                *bits = lock->l_policy_data.l_inodebits.bits;
 126
 127        unlock_res_and_lock(lock);
 128        LDLM_LOCK_PUT(lock);
 129
 130        return 0;
 131}
 132
 133enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
 134                              const struct lu_fid *fid, enum ldlm_type type,
 135                              union ldlm_policy_data *policy,
 136                              enum ldlm_mode mode,
 137                              struct lustre_handle *lockh)
 138{
 139        struct ldlm_res_id res_id;
 140        enum ldlm_mode rc;
 141
 142        fid_build_reg_res_name(fid, &res_id);
 143        /* LU-4405: Clear bits not supported by server */
 144        policy->l_inodebits.bits &= exp_connect_ibits(exp);
 145        rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
 146                             &res_id, type, policy, mode, lockh, 0);
 147        return rc;
 148}
 149
 150int mdc_cancel_unused(struct obd_export *exp,
 151                      const struct lu_fid *fid,
 152                      union ldlm_policy_data *policy,
 153                      enum ldlm_mode mode,
 154                      enum ldlm_cancel_flags flags,
 155                      void *opaque)
 156{
 157        struct ldlm_res_id res_id;
 158        struct obd_device *obd = class_exp2obd(exp);
 159        int rc;
 160
 161        fid_build_reg_res_name(fid, &res_id);
 162        rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
 163                                             policy, mode, flags, opaque);
 164        return rc;
 165}
 166
 167int mdc_null_inode(struct obd_export *exp,
 168                   const struct lu_fid *fid)
 169{
 170        struct ldlm_res_id res_id;
 171        struct ldlm_resource *res;
 172        struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
 173
 174        LASSERTF(ns, "no namespace passed\n");
 175
 176        fid_build_reg_res_name(fid, &res_id);
 177
 178        res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
 179        if (IS_ERR(res))
 180                return 0;
 181
 182        lock_res(res);
 183        res->lr_lvb_inode = NULL;
 184        unlock_res(res);
 185
 186        ldlm_resource_putref(res);
 187        return 0;
 188}
 189
 190static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
 191{
 192        /* Don't hold error requests for replay. */
 193        if (req->rq_replay) {
 194                spin_lock(&req->rq_lock);
 195                req->rq_replay = 0;
 196                spin_unlock(&req->rq_lock);
 197        }
 198        if (rc && req->rq_transno != 0) {
 199                DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
 200                LBUG();
 201        }
 202}
 203
 204/* Save a large LOV EA into the request buffer so that it is available
 205 * for replay.  We don't do this in the initial request because the
 206 * original request doesn't need this buffer (at most it sends just the
 207 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
 208 * buffer and may also be difficult to allocate and save a very large
 209 * request buffer for each open. (bug 5707)
 210 *
 211 * OOM here may cause recovery failure if lmm is needed (only for the
 212 * original open if the MDS crashed just when this client also OOM'd)
 213 * but this is incredibly unlikely, and questionable whether the client
 214 * could do MDS recovery under OOM anyways...
 215 */
 216static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 217                                struct mdt_body *body)
 218{
 219        int     rc;
 220
 221        /* FIXME: remove this explicit offset. */
 222        rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
 223                                        body->mbo_eadatasize);
 224        if (rc) {
 225                CERROR("Can't enlarge segment %d size to %d\n",
 226                       DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
 227                body->mbo_valid &= ~OBD_MD_FLEASIZE;
 228                body->mbo_eadatasize = 0;
 229        }
 230}
 231
 232static struct ptlrpc_request *
 233mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
 234                     struct md_op_data *op_data)
 235{
 236        struct ptlrpc_request *req;
 237        struct obd_device     *obddev = class_exp2obd(exp);
 238        struct ldlm_intent    *lit;
 239        const void *lmm = op_data->op_data;
 240        u32 lmmsize = op_data->op_data_size;
 241        LIST_HEAD(cancels);
 242        int                 count = 0;
 243        int                 mode;
 244        int                 rc;
 245
 246        it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
 247
 248        /* XXX: openlock is not cancelled for cross-refs. */
 249        /* If inode is known, cancel conflicting OPEN locks. */
 250        if (fid_is_sane(&op_data->op_fid2)) {
 251                if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
 252                        if (it->it_flags & FMODE_WRITE)
 253                                mode = LCK_EX;
 254                        else
 255                                mode = LCK_PR;
 256                } else {
 257                        if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
 258                                mode = LCK_CW;
 259                        else if (it->it_flags & __FMODE_EXEC)
 260                                mode = LCK_PR;
 261                        else
 262                                mode = LCK_CR;
 263                }
 264                count = mdc_resource_get_unused(exp, &op_data->op_fid2,
 265                                                &cancels, mode,
 266                                                MDS_INODELOCK_OPEN);
 267        }
 268
 269        /* If CREATE, cancel parent's UPDATE lock. */
 270        if (it->it_op & IT_CREAT)
 271                mode = LCK_EX;
 272        else
 273                mode = LCK_CR;
 274        count += mdc_resource_get_unused(exp, &op_data->op_fid1,
 275                                         &cancels, mode,
 276                                         MDS_INODELOCK_UPDATE);
 277
 278        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 279                                   &RQF_LDLM_INTENT_OPEN);
 280        if (!req) {
 281                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 282                return ERR_PTR(-ENOMEM);
 283        }
 284
 285        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 286                             op_data->op_namelen + 1);
 287        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
 288                             max(lmmsize, obddev->u.cli.cl_default_mds_easize));
 289
 290        rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
 291        if (rc < 0) {
 292                ptlrpc_request_free(req);
 293                return ERR_PTR(rc);
 294        }
 295
 296        spin_lock(&req->rq_lock);
 297        req->rq_replay = req->rq_import->imp_replayable;
 298        spin_unlock(&req->rq_lock);
 299
 300        /* pack the intent */
 301        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 302        lit->opc = (__u64)it->it_op;
 303
 304        /* pack the intended request */
 305        mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
 306                      lmmsize);
 307
 308        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 309                             obddev->u.cli.cl_max_mds_easize);
 310
 311        ptlrpc_request_set_replen(req);
 312        return req;
 313}
 314
 315static struct ptlrpc_request *
 316mdc_intent_getxattr_pack(struct obd_export *exp,
 317                         struct lookup_intent *it,
 318                         struct md_op_data *op_data)
 319{
 320        struct ptlrpc_request   *req;
 321        struct ldlm_intent      *lit;
 322        int rc, count = 0;
 323        u32 maxdata;
 324        LIST_HEAD(cancels);
 325
 326        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 327                                   &RQF_LDLM_INTENT_GETXATTR);
 328        if (!req)
 329                return ERR_PTR(-ENOMEM);
 330
 331        rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
 332        if (rc) {
 333                ptlrpc_request_free(req);
 334                return ERR_PTR(rc);
 335        }
 336
 337        /* pack the intent */
 338        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 339        lit->opc = IT_GETXATTR;
 340
 341        maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
 342
 343        /* pack the intended request */
 344        mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
 345                      0);
 346
 347        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
 348
 349        req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
 350
 351        req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
 352                             RCL_SERVER, maxdata);
 353
 354        ptlrpc_request_set_replen(req);
 355
 356        return req;
 357}
 358
 359static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
 360                                                     struct lookup_intent *it,
 361                                                     struct md_op_data *op_data)
 362{
 363        struct ptlrpc_request *req;
 364        struct obd_device     *obddev = class_exp2obd(exp);
 365        struct ldlm_intent    *lit;
 366        int                 rc;
 367
 368        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 369                                   &RQF_LDLM_INTENT_UNLINK);
 370        if (!req)
 371                return ERR_PTR(-ENOMEM);
 372
 373        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 374                             op_data->op_namelen + 1);
 375
 376        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 377        if (rc) {
 378                ptlrpc_request_free(req);
 379                return ERR_PTR(rc);
 380        }
 381
 382        /* pack the intent */
 383        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 384        lit->opc = (__u64)it->it_op;
 385
 386        /* pack the intended request */
 387        mdc_unlink_pack(req, op_data);
 388
 389        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 390                             obddev->u.cli.cl_default_mds_easize);
 391        ptlrpc_request_set_replen(req);
 392        return req;
 393}
 394
 395static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
 396                                                      struct lookup_intent *it,
 397                                                     struct md_op_data *op_data)
 398{
 399        struct ptlrpc_request *req;
 400        struct obd_device     *obddev = class_exp2obd(exp);
 401        u64                    valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
 402                                       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
 403                                       OBD_MD_MEA | OBD_MD_FLACL;
 404        struct ldlm_intent    *lit;
 405        int                 rc;
 406        u32 easize;
 407
 408        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 409                                   &RQF_LDLM_INTENT_GETATTR);
 410        if (!req)
 411                return ERR_PTR(-ENOMEM);
 412
 413        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 414                             op_data->op_namelen + 1);
 415
 416        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 417        if (rc) {
 418                ptlrpc_request_free(req);
 419                return ERR_PTR(rc);
 420        }
 421
 422        /* pack the intent */
 423        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 424        lit->opc = (__u64)it->it_op;
 425
 426        if (obddev->u.cli.cl_default_mds_easize > 0)
 427                easize = obddev->u.cli.cl_default_mds_easize;
 428        else
 429                easize = obddev->u.cli.cl_max_mds_easize;
 430
 431        /* pack the intended request */
 432        mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
 433
 434        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
 435        ptlrpc_request_set_replen(req);
 436        return req;
 437}
 438
 439static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
 440                                                     struct lookup_intent *it,
 441                                                     struct md_op_data *unused)
 442{
 443        struct obd_device     *obd = class_exp2obd(exp);
 444        struct ptlrpc_request *req;
 445        struct ldlm_intent    *lit;
 446        struct layout_intent  *layout;
 447        int rc;
 448
 449        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 450                                   &RQF_LDLM_INTENT_LAYOUT);
 451        if (!req)
 452                return ERR_PTR(-ENOMEM);
 453
 454        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
 455        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 456        if (rc) {
 457                ptlrpc_request_free(req);
 458                return ERR_PTR(rc);
 459        }
 460
 461        /* pack the intent */
 462        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 463        lit->opc = (__u64)it->it_op;
 464
 465        /* pack the layout intent request */
 466        layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
 467        /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
 468         * set for replication
 469         */
 470        layout->li_opc = LAYOUT_INTENT_ACCESS;
 471
 472        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
 473                             obd->u.cli.cl_default_mds_easize);
 474        ptlrpc_request_set_replen(req);
 475        return req;
 476}
 477
 478static struct ptlrpc_request *
 479mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
 480{
 481        struct ptlrpc_request *req;
 482        int rc;
 483
 484        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
 485        if (!req)
 486                return ERR_PTR(-ENOMEM);
 487
 488        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 489        if (rc) {
 490                ptlrpc_request_free(req);
 491                return ERR_PTR(rc);
 492        }
 493
 494        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
 495        ptlrpc_request_set_replen(req);
 496        return req;
 497}
 498
 499static int mdc_finish_enqueue(struct obd_export *exp,
 500                              struct ptlrpc_request *req,
 501                              struct ldlm_enqueue_info *einfo,
 502                              struct lookup_intent *it,
 503                              struct lustre_handle *lockh,
 504                              int rc)
 505{
 506        struct req_capsule  *pill = &req->rq_pill;
 507        struct ldlm_request *lockreq;
 508        struct ldlm_reply   *lockrep;
 509        struct ldlm_lock    *lock;
 510        void            *lvb_data = NULL;
 511        u32 lvb_len = 0;
 512
 513        LASSERT(rc >= 0);
 514        /* Similarly, if we're going to replay this request, we don't want to
 515         * actually get a lock, just perform the intent.
 516         */
 517        if (req->rq_transno || req->rq_replay) {
 518                lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
 519                lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
 520        }
 521
 522        if (rc == ELDLM_LOCK_ABORTED) {
 523                einfo->ei_mode = 0;
 524                memset(lockh, 0, sizeof(*lockh));
 525                rc = 0;
 526        } else { /* rc = 0 */
 527                lock = ldlm_handle2lock(lockh);
 528
 529                /* If the server gave us back a different lock mode, we should
 530                 * fix up our variables.
 531                 */
 532                if (lock->l_req_mode != einfo->ei_mode) {
 533                        ldlm_lock_addref(lockh, lock->l_req_mode);
 534                        ldlm_lock_decref(lockh, einfo->ei_mode);
 535                        einfo->ei_mode = lock->l_req_mode;
 536                }
 537                LDLM_LOCK_PUT(lock);
 538        }
 539
 540        lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
 541
 542        it->it_disposition = (int)lockrep->lock_policy_res1;
 543        it->it_status = (int)lockrep->lock_policy_res2;
 544        it->it_lock_mode = einfo->ei_mode;
 545        it->it_lock_handle = lockh->cookie;
 546        it->it_request = req;
 547
 548        /* Technically speaking rq_transno must already be zero if
 549         * it_status is in error, so the check is a bit redundant
 550         */
 551        if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
 552                mdc_clear_replay_flag(req, it->it_status);
 553
 554        /* If we're doing an IT_OPEN which did not result in an actual
 555         * successful open, then we need to remove the bit which saves
 556         * this request for unconditional replay.
 557         *
 558         * It's important that we do this first!  Otherwise we might exit the
 559         * function without doing so, and try to replay a failed create
 560         * (bug 3440)
 561         */
 562        if (it->it_op & IT_OPEN && req->rq_replay &&
 563            (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
 564                mdc_clear_replay_flag(req, it->it_status);
 565
 566        DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
 567                  it->it_op, it->it_disposition, it->it_status);
 568
 569        /* We know what to expect, so we do any byte flipping required here */
 570        if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
 571                struct mdt_body *body;
 572
 573                body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 574                if (!body) {
 575                        CERROR("Can't swab mdt_body\n");
 576                        return -EPROTO;
 577                }
 578
 579                if (it_disposition(it, DISP_OPEN_OPEN) &&
 580                    !it_open_error(DISP_OPEN_OPEN, it)) {
 581                        /*
 582                         * If this is a successful OPEN request, we need to set
 583                         * replay handler and data early, so that if replay
 584                         * happens immediately after swabbing below, new reply
 585                         * is swabbed by that handler correctly.
 586                         */
 587                        mdc_set_open_replay_data(NULL, NULL, it);
 588                }
 589
 590                if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
 591                        void *eadata;
 592
 593                        mdc_update_max_ea_from_body(exp, body);
 594
 595                        /*
 596                         * The eadata is opaque; just check that it is there.
 597                         * Eventually, obd_unpackmd() will check the contents.
 598                         */
 599                        eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 600                                                              body->mbo_eadatasize);
 601                        if (!eadata)
 602                                return -EPROTO;
 603
 604                        /* save lvb data and length in case this is for layout
 605                         * lock
 606                         */
 607                        lvb_data = eadata;
 608                        lvb_len = body->mbo_eadatasize;
 609
 610                        /*
 611                         * We save the reply LOV EA in case we have to replay a
 612                         * create for recovery.  If we didn't allocate a large
 613                         * enough request buffer above we need to reallocate it
 614                         * here to hold the actual LOV EA.
 615                         *
 616                         * To not save LOV EA if request is not going to replay
 617                         * (for example error one).
 618                         */
 619                        if ((it->it_op & IT_OPEN) && req->rq_replay) {
 620                                void *lmm;
 621
 622                                if (req_capsule_get_size(pill, &RMF_EADATA,
 623                                                         RCL_CLIENT) <
 624                                    body->mbo_eadatasize)
 625                                        mdc_realloc_openmsg(req, body);
 626                                else
 627                                        req_capsule_shrink(pill, &RMF_EADATA,
 628                                                           body->mbo_eadatasize,
 629                                                           RCL_CLIENT);
 630
 631                                req_capsule_set_size(pill, &RMF_EADATA,
 632                                                     RCL_CLIENT,
 633                                                     body->mbo_eadatasize);
 634
 635                                lmm = req_capsule_client_get(pill, &RMF_EADATA);
 636                                if (lmm)
 637                                        memcpy(lmm, eadata, body->mbo_eadatasize);
 638                        }
 639                }
 640        } else if (it->it_op & IT_LAYOUT) {
 641                /* maybe the lock was granted right away and layout
 642                 * is packed into RMF_DLM_LVB of req
 643                 */
 644                lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
 645                if (lvb_len > 0) {
 646                        lvb_data = req_capsule_server_sized_get(pill,
 647                                                                &RMF_DLM_LVB,
 648                                                                lvb_len);
 649                        if (!lvb_data)
 650                                return -EPROTO;
 651                }
 652        }
 653
 654        /* fill in stripe data for layout lock */
 655        lock = ldlm_handle2lock(lockh);
 656        if (lock && ldlm_has_layout(lock) && lvb_data) {
 657                void *lmm;
 658
 659                LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
 660                           ldlm_it2str(it->it_op), lvb_len);
 661
 662                lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
 663                if (!lmm) {
 664                        LDLM_LOCK_PUT(lock);
 665                        return -ENOMEM;
 666                }
 667                memcpy(lmm, lvb_data, lvb_len);
 668
 669                /* install lvb_data */
 670                lock_res_and_lock(lock);
 671                if (!lock->l_lvb_data) {
 672                        lock->l_lvb_type = LVB_T_LAYOUT;
 673                        lock->l_lvb_data = lmm;
 674                        lock->l_lvb_len = lvb_len;
 675                        lmm = NULL;
 676                }
 677                unlock_res_and_lock(lock);
 678                if (lmm)
 679                        kvfree(lmm);
 680        }
 681        if (lock)
 682                LDLM_LOCK_PUT(lock);
 683
 684        return rc;
 685}
 686
 687/* We always reserve enough space in the reply packet for a stripe MD, because
 688 * we don't know in advance the file type.
 689 */
 690int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 691                const union ldlm_policy_data *policy,
 692                struct lookup_intent *it, struct md_op_data *op_data,
 693                struct lustre_handle *lockh, u64 extra_lock_flags)
 694{
 695        static const union ldlm_policy_data lookup_policy = {
 696                .l_inodebits = { MDS_INODELOCK_LOOKUP }
 697        };
 698        static const union ldlm_policy_data update_policy = {
 699                .l_inodebits = { MDS_INODELOCK_UPDATE }
 700        };
 701        static const union ldlm_policy_data layout_policy = {
 702                .l_inodebits = { MDS_INODELOCK_LAYOUT }
 703        };
 704        static const union ldlm_policy_data getxattr_policy = {
 705                .l_inodebits = { MDS_INODELOCK_XATTR }
 706        };
 707        struct obd_device *obddev = class_exp2obd(exp);
 708        struct ptlrpc_request *req = NULL;
 709        u64 flags, saved_flags = extra_lock_flags;
 710        struct ldlm_res_id res_id;
 711        int generation, resends = 0;
 712        struct ldlm_reply *lockrep;
 713        enum lvb_type lvb_type = LVB_T_NONE;
 714        int rc;
 715
 716        LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
 717                 einfo->ei_type);
 718        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 719
 720        if (it) {
 721                LASSERT(!policy);
 722
 723                saved_flags |= LDLM_FL_HAS_INTENT;
 724                if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
 725                        policy = &update_policy;
 726                else if (it->it_op & IT_LAYOUT)
 727                        policy = &layout_policy;
 728                else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
 729                        policy = &getxattr_policy;
 730                else
 731                        policy = &lookup_policy;
 732        }
 733
 734        generation = obddev->u.cli.cl_import->imp_generation;
 735resend:
 736        flags = saved_flags;
 737        if (!it) {
 738                /* The only way right now is FLOCK. */
 739                LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
 740                         einfo->ei_type);
 741                res_id.name[3] = LDLM_FLOCK;
 742        } else if (it->it_op & IT_OPEN) {
 743                req = mdc_intent_open_pack(exp, it, op_data);
 744        } else if (it->it_op & IT_UNLINK) {
 745                req = mdc_intent_unlink_pack(exp, it, op_data);
 746        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
 747                req = mdc_intent_getattr_pack(exp, it, op_data);
 748        } else if (it->it_op & IT_READDIR) {
 749                req = mdc_enqueue_pack(exp, 0);
 750        } else if (it->it_op & IT_LAYOUT) {
 751                if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
 752                        return -EOPNOTSUPP;
 753                req = mdc_intent_layout_pack(exp, it, op_data);
 754                lvb_type = LVB_T_LAYOUT;
 755        } else if (it->it_op & IT_GETXATTR) {
 756                req = mdc_intent_getxattr_pack(exp, it, op_data);
 757        } else {
 758                LBUG();
 759                return -EINVAL;
 760        }
 761
 762        if (IS_ERR(req))
 763                return PTR_ERR(req);
 764
 765        if (resends) {
 766                req->rq_generation_set = 1;
 767                req->rq_import_generation = generation;
 768                req->rq_sent = ktime_get_real_seconds() + resends;
 769        }
 770
 771        /* It is important to obtain modify RPC slot first (if applicable), so
 772         * that threads that are waiting for a modify RPC slot are not polluting
 773         * our rpcs in flight counter.
 774         * We do not do flock request limiting, though
 775         */
 776        if (it) {
 777                mdc_get_mod_rpc_slot(req, it);
 778                rc = obd_get_request_slot(&obddev->u.cli);
 779                if (rc != 0) {
 780                        mdc_put_mod_rpc_slot(req, it);
 781                        mdc_clear_replay_flag(req, 0);
 782                        ptlrpc_req_finished(req);
 783                        return rc;
 784                }
 785        }
 786
 787        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
 788                              0, lvb_type, lockh, 0);
 789        if (!it) {
 790                /* For flock requests we immediately return without further
 791                 * delay and let caller deal with the rest, since rest of
 792                 * this function metadata processing makes no sense for flock
 793                 * requests anyway. But in case of problem during comms with
 794                 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
 795                 * can not rely on caller and this mainly for F_UNLCKs
 796                 * (explicits or automatically generated by Kernel to clean
 797                 * current FLocks upon exit) that can't be trashed
 798                 */
 799                if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
 800                    (einfo->ei_type == LDLM_FLOCK) &&
 801                    (einfo->ei_mode == LCK_NL))
 802                        goto resend;
 803                return rc;
 804        }
 805
 806        obd_put_request_slot(&obddev->u.cli);
 807        mdc_put_mod_rpc_slot(req, it);
 808
 809        if (rc < 0) {
 810                CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
 811                       obddev->obd_name, rc);
 812
 813                mdc_clear_replay_flag(req, rc);
 814                ptlrpc_req_finished(req);
 815                return rc;
 816        }
 817
 818        lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 819
 820        lockrep->lock_policy_res2 =
 821                ptlrpc_status_ntoh(lockrep->lock_policy_res2);
 822
 823        /*
 824         * Retry infinitely when the server returns -EINPROGRESS for the
 825         * intent operation, when server returns -EINPROGRESS for acquiring
 826         * intent lock, we'll retry in after_reply().
 827         */
 828        if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
 829                mdc_clear_replay_flag(req, rc);
 830                ptlrpc_req_finished(req);
 831                resends++;
 832
 833                CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
 834                       obddev->obd_name, resends, it->it_op,
 835                       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
 836
 837                if (generation == obddev->u.cli.cl_import->imp_generation) {
 838                        goto resend;
 839                } else {
 840                        CDEBUG(D_HA, "resend cross eviction\n");
 841                        return -EIO;
 842                }
 843        }
 844
 845        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
 846        if (rc < 0) {
 847                if (lustre_handle_is_used(lockh)) {
 848                        ldlm_lock_decref(lockh, einfo->ei_mode);
 849                        memset(lockh, 0, sizeof(*lockh));
 850                }
 851                ptlrpc_req_finished(req);
 852
 853                it->it_lock_handle = 0;
 854                it->it_lock_mode = 0;
 855                it->it_request = NULL;
 856        }
 857
 858        return rc;
 859}
 860
 861static int mdc_finish_intent_lock(struct obd_export *exp,
 862                                  struct ptlrpc_request *request,
 863                                  struct md_op_data *op_data,
 864                                  struct lookup_intent *it,
 865                                  struct lustre_handle *lockh)
 866{
 867        struct lustre_handle old_lock;
 868        struct mdt_body *mdt_body;
 869        struct ldlm_lock *lock;
 870        int rc;
 871
 872        LASSERT(request != LP_POISON);
 873        LASSERT(request->rq_repmsg != LP_POISON);
 874
 875        if (it->it_op & IT_READDIR)
 876                return 0;
 877
 878        if (!it_disposition(it, DISP_IT_EXECD)) {
 879                /* The server failed before it even started executing the
 880                 * intent, i.e. because it couldn't unpack the request.
 881                 */
 882                LASSERT(it->it_status != 0);
 883                return it->it_status;
 884        }
 885        rc = it_open_error(DISP_IT_EXECD, it);
 886        if (rc)
 887                return rc;
 888
 889        mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
 890        LASSERT(mdt_body);      /* mdc_enqueue checked */
 891
 892        rc = it_open_error(DISP_LOOKUP_EXECD, it);
 893        if (rc)
 894                return rc;
 895
 896        /* keep requests around for the multiple phases of the call
 897         * this shows the DISP_XX must guarantee we make it into the call
 898         */
 899        if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
 900            it_disposition(it, DISP_OPEN_CREATE) &&
 901            !it_open_error(DISP_OPEN_CREATE, it)) {
 902                it_set_disposition(it, DISP_ENQ_CREATE_REF);
 903                ptlrpc_request_addref(request); /* balanced in ll_create_node */
 904        }
 905        if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
 906            it_disposition(it, DISP_OPEN_OPEN) &&
 907            !it_open_error(DISP_OPEN_OPEN, it)) {
 908                it_set_disposition(it, DISP_ENQ_OPEN_REF);
 909                ptlrpc_request_addref(request); /* balanced in ll_file_open */
 910                /* BUG 11546 - eviction in the middle of open rpc processing */
 911                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
 912        }
 913
 914        if (it->it_op & IT_CREAT) {
 915                /* XXX this belongs in ll_create_it */
 916        } else if (it->it_op == IT_OPEN) {
 917                LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
 918        } else {
 919                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
 920        }
 921
 922        /* If we already have a matching lock, then cancel the new
 923         * one.  We have to set the data here instead of in
 924         * mdc_enqueue, because we need to use the child's inode as
 925         * the l_ast_data to match, and that's not available until
 926         * intent_finish has performed the iget().)
 927         */
 928        lock = ldlm_handle2lock(lockh);
 929        if (lock) {
 930                union ldlm_policy_data policy = lock->l_policy_data;
 931
 932                LDLM_DEBUG(lock, "matching against this");
 933
 934                LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
 935                                         &lock->l_resource->lr_name),
 936                         "Lock res_id: "DLDLMRES", fid: "DFID"\n",
 937                         PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
 938                LDLM_LOCK_PUT(lock);
 939
 940                memcpy(&old_lock, lockh, sizeof(*lockh));
 941                if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
 942                                    LDLM_IBITS, &policy, LCK_NL,
 943                                    &old_lock, 0)) {
 944                        ldlm_lock_decref_and_cancel(lockh,
 945                                                    it->it_lock_mode);
 946                        memcpy(lockh, &old_lock, sizeof(old_lock));
 947                        it->it_lock_handle = lockh->cookie;
 948                }
 949        }
 950        CDEBUG(D_DENTRY,
 951               "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
 952               (int)op_data->op_namelen, op_data->op_name,
 953               ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
 954        return rc;
 955}
 956
 957int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
 958                        struct lu_fid *fid, __u64 *bits)
 959{
 960        /* We could just return 1 immediately, but since we should only
 961         * be called in revalidate_it if we already have a lock, let's
 962         * verify that.
 963         */
 964        struct ldlm_res_id res_id;
 965        struct lustre_handle lockh;
 966        union ldlm_policy_data policy;
 967        enum ldlm_mode mode;
 968
 969        if (it->it_lock_handle) {
 970                lockh.cookie = it->it_lock_handle;
 971                mode = ldlm_revalidate_lock_handle(&lockh, bits);
 972        } else {
 973                fid_build_reg_res_name(fid, &res_id);
 974                switch (it->it_op) {
 975                case IT_GETATTR:
 976                        /* File attributes are held under multiple bits:
 977                         * nlink is under lookup lock, size and times are
 978                         * under UPDATE lock and recently we've also got
 979                         * a separate permissions lock for owner/group/acl that
 980                         * were protected by lookup lock before.
 981                         * Getattr must provide all of that information,
 982                         * so we need to ensure we have all of those locks.
 983                         * Unfortunately, if the bits are split across multiple
 984                         * locks, there's no easy way to match all of them here,
 985                         * so an extra RPC would be performed to fetch all
 986                         * of those bits at once for now.
 987                         */
 988                        /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
 989                         * but for old MDTs (< 2.4), permission is covered
 990                         * by LOOKUP lock, so it needs to match all bits here.
 991                         */
 992                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
 993                                                  MDS_INODELOCK_LOOKUP |
 994                                                  MDS_INODELOCK_PERM;
 995                        break;
 996                case IT_READDIR:
 997                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 998                        break;
 999                case IT_LAYOUT:
1000                        policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1001                        break;
1002                default:
1003                        policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1004                        break;
1005                }
1006
1007                mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1008                                      LDLM_IBITS, &policy,
1009                                      LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1010                                      &lockh);
1011        }
1012
1013        if (mode) {
1014                it->it_lock_handle = lockh.cookie;
1015                it->it_lock_mode = mode;
1016        } else {
1017                it->it_lock_handle = 0;
1018                it->it_lock_mode = 0;
1019        }
1020
1021        return !!mode;
1022}
1023
1024/*
1025 * This long block is all about fixing up the lock and request state
1026 * so that it is correct as of the moment _before_ the operation was
1027 * applied; that way, the VFS will think that everything is normal and
1028 * call Lustre's regular VFS methods.
1029 *
1030 * If we're performing a creation, that means that unless the creation
1031 * failed with EEXIST, we should fake up a negative dentry.
1032 *
1033 * For everything else, we want to lookup to succeed.
1034 *
1035 * One additional note: if CREATE or OPEN succeeded, we add an extra
1036 * reference to the request because we need to keep it around until
1037 * ll_create/ll_open gets called.
1038 *
1039 * The server will return to us, in it_disposition, an indication of
1040 * exactly what it_status refers to.
1041 *
1042 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1043 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1044 * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1045 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1046 * was successful.
1047 *
1048 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1049 * child lookup.
1050 */
1051int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1052                    struct lookup_intent *it, struct ptlrpc_request **reqp,
1053                    ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1054{
1055        struct ldlm_enqueue_info einfo = {
1056                .ei_type        = LDLM_IBITS,
1057                .ei_mode        = it_to_lock_mode(it),
1058                .ei_cb_bl       = cb_blocking,
1059                .ei_cb_cp       = ldlm_completion_ast,
1060        };
1061        struct lustre_handle lockh;
1062        int rc = 0;
1063
1064        LASSERT(it);
1065
1066        CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1067                ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1068                op_data->op_name, PFID(&op_data->op_fid2),
1069                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1070                it->it_flags);
1071
1072        lockh.cookie = 0;
1073        if (fid_is_sane(&op_data->op_fid2) &&
1074            (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1075                /* We could just return 1 immediately, but since we should only
1076                 * be called in revalidate_it if we already have a lock, let's
1077                 * verify that.
1078                 */
1079                it->it_lock_handle = 0;
1080                rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1081                /* Only return failure if it was not GETATTR by cfid
1082                 * (from inode_revalidate)
1083                 */
1084                if (rc || op_data->op_namelen != 0)
1085                        return rc;
1086        }
1087
1088        /* For case if upper layer did not alloc fid, do it now. */
1089        if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1090                rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1091                if (rc < 0) {
1092                        CERROR("Can't alloc new fid, rc %d\n", rc);
1093                        return rc;
1094                }
1095        }
1096        rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1097                         extra_lock_flags);
1098        if (rc < 0)
1099                return rc;
1100
1101        *reqp = it->it_request;
1102        rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1103        return rc;
1104}
1105
1106static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1107                                              struct ptlrpc_request *req,
1108                                              void *args, int rc)
1109{
1110        struct mdc_getattr_args  *ga = args;
1111        struct obd_export       *exp = ga->ga_exp;
1112        struct md_enqueue_info   *minfo = ga->ga_minfo;
1113        struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1114        struct lookup_intent     *it;
1115        struct lustre_handle     *lockh;
1116        struct obd_device       *obddev;
1117        struct ldlm_reply        *lockrep;
1118        __u64                flags = LDLM_FL_HAS_INTENT;
1119
1120        it    = &minfo->mi_it;
1121        lockh = &minfo->mi_lockh;
1122
1123        obddev = class_exp2obd(exp);
1124
1125        obd_put_request_slot(&obddev->u.cli);
1126        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1127                rc = -ETIMEDOUT;
1128
1129        rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1130                                   &flags, NULL, 0, lockh, rc);
1131        if (rc < 0) {
1132                CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1133                mdc_clear_replay_flag(req, rc);
1134                goto out;
1135        }
1136
1137        lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1138
1139        lockrep->lock_policy_res2 =
1140                ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1141
1142        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1143        if (rc)
1144                goto out;
1145
1146        rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1147
1148out:
1149        minfo->mi_cb(req, minfo, rc);
1150        return 0;
1151}
1152
1153int mdc_intent_getattr_async(struct obd_export *exp,
1154                             struct md_enqueue_info *minfo)
1155{
1156        struct md_op_data       *op_data = &minfo->mi_data;
1157        struct lookup_intent    *it = &minfo->mi_it;
1158        struct ptlrpc_request   *req;
1159        struct mdc_getattr_args *ga;
1160        struct obd_device       *obddev = class_exp2obd(exp);
1161        struct ldlm_res_id       res_id;
1162        union ldlm_policy_data policy = {
1163                .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1164        };
1165        int                   rc = 0;
1166        __u64               flags = LDLM_FL_HAS_INTENT;
1167
1168        CDEBUG(D_DLMTRACE,
1169               "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1170               (int)op_data->op_namelen, op_data->op_name,
1171               PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1172
1173        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1174        req = mdc_intent_getattr_pack(exp, it, op_data);
1175        if (IS_ERR(req))
1176                return PTR_ERR(req);
1177
1178        rc = obd_get_request_slot(&obddev->u.cli);
1179        if (rc != 0) {
1180                ptlrpc_req_finished(req);
1181                return rc;
1182        }
1183
1184        rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1185                              &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1186        if (rc < 0) {
1187                obd_put_request_slot(&obddev->u.cli);
1188                ptlrpc_req_finished(req);
1189                return rc;
1190        }
1191
1192        BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args));
1193        ga = ptlrpc_req_async_args(req);
1194        ga->ga_exp = exp;
1195        ga->ga_minfo = minfo;
1196
1197        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1198        ptlrpcd_add_req(req);
1199
1200        return 0;
1201}
1202