linux/drivers/staging/lustre/lustre/mdc/mdc_reint.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_MDC
  38
  39# include <linux/module.h>
  40# include <linux/kernel.h>
  41
  42#include <obd_class.h>
  43#include "mdc_internal.h"
  44#include <lustre_fid.h>
  45
  46/* mdc_setattr does its own semaphore handling */
  47static int mdc_reint(struct ptlrpc_request *request,
  48                     struct mdc_rpc_lock *rpc_lock,
  49                     int level)
  50{
  51        int rc;
  52
  53        request->rq_send_state = level;
  54
  55        mdc_get_rpc_lock(rpc_lock, NULL);
  56        rc = ptlrpc_queue_wait(request);
  57        mdc_put_rpc_lock(rpc_lock, NULL);
  58        if (rc)
  59                CDEBUG(D_INFO, "error in handling %d\n", rc);
  60        else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
  61                rc = -EPROTO;
  62        }
  63        return rc;
  64}
  65
  66/* Find and cancel locally locks matched by inode @bits & @mode in the resource
  67 * found by @fid. Found locks are added into @cancel list. Returns the amount of
  68 * locks added to @cancels list. */
  69int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
  70                            struct list_head *cancels, ldlm_mode_t mode,
  71                            __u64 bits)
  72{
  73        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
  74        ldlm_policy_data_t policy = {{0}};
  75        struct ldlm_res_id res_id;
  76        struct ldlm_resource *res;
  77        int count;
  78
  79        /* Return, i.e. cancel nothing, only if ELC is supported (flag in
  80         * export) but disabled through procfs (flag in NS).
  81         *
  82         * This distinguishes from a case when ELC is not supported originally,
  83         * when we still want to cancel locks in advance and just cancel them
  84         * locally, without sending any RPC. */
  85        if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
  86                return 0;
  87
  88        fid_build_reg_res_name(fid, &res_id);
  89        res = ldlm_resource_get(exp->exp_obd->obd_namespace,
  90                                NULL, &res_id, 0, 0);
  91        if (res == NULL)
  92                return 0;
  93        LDLM_RESOURCE_ADDREF(res);
  94        /* Initialize ibits lock policy. */
  95        policy.l_inodebits.bits = bits;
  96        count = ldlm_cancel_resource_local(res, cancels, &policy,
  97                                           mode, 0, 0, NULL);
  98        LDLM_RESOURCE_DELREF(res);
  99        ldlm_resource_putref(res);
 100        return count;
 101}
 102
 103int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 104                void *ea, int ealen, void *ea2, int ea2len,
 105                struct ptlrpc_request **request, struct md_open_data **mod)
 106{
 107        LIST_HEAD(cancels);
 108        struct ptlrpc_request *req;
 109        struct mdc_rpc_lock *rpc_lock;
 110        struct obd_device *obd = exp->exp_obd;
 111        int count = 0, rc;
 112        __u64 bits;
 113
 114        LASSERT(op_data != NULL);
 115
 116        bits = MDS_INODELOCK_UPDATE;
 117        if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
 118                bits |= MDS_INODELOCK_LOOKUP;
 119        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
 120            (fid_is_sane(&op_data->op_fid1)) &&
 121            !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
 122                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
 123                                                &cancels, LCK_EX, bits);
 124        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 125                                   &RQF_MDS_REINT_SETATTR);
 126        if (req == NULL) {
 127                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 128                return -ENOMEM;
 129        }
 130        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 131        if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
 132                req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
 133                                     0);
 134        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
 135        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
 136                             ea2len);
 137
 138        rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 139        if (rc) {
 140                ptlrpc_request_free(req);
 141                return rc;
 142        }
 143
 144        rpc_lock = obd->u.cli.cl_rpc_lock;
 145
 146        if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
 147                CDEBUG(D_INODE, "setting mtime "CFS_TIME_T
 148                       ", ctime "CFS_TIME_T"\n",
 149                       LTIME_S(op_data->op_attr.ia_mtime),
 150                       LTIME_S(op_data->op_attr.ia_ctime));
 151        mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
 152
 153        ptlrpc_request_set_replen(req);
 154        if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
 155            req->rq_import->imp_replayable)
 156        {
 157                LASSERT(*mod == NULL);
 158
 159                *mod = obd_mod_alloc();
 160                if (*mod == NULL) {
 161                        DEBUG_REQ(D_ERROR, req, "Can't allocate "
 162                                  "md_open_data");
 163                } else {
 164                        req->rq_replay = 1;
 165                        req->rq_cb_data = *mod;
 166                        (*mod)->mod_open_req = req;
 167                        req->rq_commit_cb = mdc_commit_open;
 168                        /**
 169                         * Take an extra reference on \var mod, it protects \var
 170                         * mod from being freed on eviction (commit callback is
 171                         * called despite rq_replay flag).
 172                         * Will be put on mdc_done_writing().
 173                         */
 174                        obd_mod_get(*mod);
 175                }
 176        }
 177
 178        rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
 179
 180        /* Save the obtained info in the original RPC for the replay case. */
 181        if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
 182                struct mdt_ioepoch *epoch;
 183                struct mdt_body  *body;
 184
 185                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
 186                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 187                LASSERT(epoch != NULL);
 188                LASSERT(body != NULL);
 189                epoch->handle = body->handle;
 190                epoch->ioepoch = body->ioepoch;
 191                req->rq_replay_cb = mdc_replay_open;
 192        /** bug 3633, open may be committed and estale answer is not error */
 193        } else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
 194                rc = 0;
 195        } else if (rc == -ERESTARTSYS) {
 196                rc = 0;
 197        }
 198        *request = req;
 199        if (rc && req->rq_commit_cb) {
 200                /* Put an extra reference on \var mod on error case. */
 201                obd_mod_put(*mod);
 202                req->rq_commit_cb(req);
 203        }
 204        return rc;
 205}
 206
 207int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 208               const void *data, int datalen, int mode, __u32 uid, __u32 gid,
 209               cfs_cap_t cap_effective, __u64 rdev,
 210               struct ptlrpc_request **request)
 211{
 212        struct ptlrpc_request *req;
 213        int level, rc;
 214        int count, resends = 0;
 215        struct obd_import *import = exp->exp_obd->u.cli.cl_import;
 216        int generation = import->imp_generation;
 217        LIST_HEAD(cancels);
 218
 219        /* For case if upper layer did not alloc fid, do it now. */
 220        if (!fid_is_sane(&op_data->op_fid2)) {
 221                /*
 222                 * mdc_fid_alloc() may return errno 1 in case of switch to new
 223                 * sequence, handle this.
 224                 */
 225                rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
 226                if (rc < 0) {
 227                        CERROR("Can't alloc new fid, rc %d\n", rc);
 228                        return rc;
 229                }
 230        }
 231
 232rebuild:
 233        count = 0;
 234        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
 235            (fid_is_sane(&op_data->op_fid1)))
 236                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
 237                                                &cancels, LCK_EX,
 238                                                MDS_INODELOCK_UPDATE);
 239
 240        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 241                                   &RQF_MDS_REINT_CREATE_RMT_ACL);
 242        if (req == NULL) {
 243                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 244                return -ENOMEM;
 245        }
 246        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 247        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 248                             op_data->op_namelen + 1);
 249        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
 250                             data && datalen ? datalen : 0);
 251
 252        rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 253        if (rc) {
 254                ptlrpc_request_free(req);
 255                return rc;
 256        }
 257
 258        /*
 259         * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
 260         * tgt, for symlinks or lov MD data.
 261         */
 262        mdc_create_pack(req, op_data, data, datalen, mode, uid,
 263                        gid, cap_effective, rdev);
 264
 265        ptlrpc_request_set_replen(req);
 266
 267        /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
 268         * logic here */
 269        req->rq_no_retry_einprogress = 1;
 270
 271        if (resends) {
 272                req->rq_generation_set = 1;
 273                req->rq_import_generation = generation;
 274                req->rq_sent = cfs_time_current_sec() + resends;
 275        }
 276        level = LUSTRE_IMP_FULL;
 277 resend:
 278        rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
 279
 280        /* Resend if we were told to. */
 281        if (rc == -ERESTARTSYS) {
 282                level = LUSTRE_IMP_RECOVER;
 283                goto resend;
 284        } else if (rc == -EINPROGRESS) {
 285                /* Retry create infinitely until succeed or get other
 286                 * error code. */
 287                ptlrpc_req_finished(req);
 288                resends++;
 289
 290                CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
 291                       exp->exp_obd->obd_name, resends,
 292                       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
 293
 294                if (generation == import->imp_generation) {
 295                        goto rebuild;
 296                } else {
 297                        CDEBUG(D_HA, "resend cross eviction\n");
 298                        return -EIO;
 299                }
 300        } else if (rc == 0) {
 301                struct mdt_body *body;
 302                struct lustre_capa *capa;
 303
 304                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 305                LASSERT(body);
 306                if (body->valid & OBD_MD_FLMDSCAPA) {
 307                        capa = req_capsule_server_get(&req->rq_pill,
 308                                                      &RMF_CAPA1);
 309                        if (capa == NULL)
 310                                rc = -EPROTO;
 311                }
 312        }
 313
 314        *request = req;
 315        return rc;
 316}
 317
 318int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 319               struct ptlrpc_request **request)
 320{
 321        LIST_HEAD(cancels);
 322        struct obd_device *obd = class_exp2obd(exp);
 323        struct ptlrpc_request *req = *request;
 324        int count = 0, rc;
 325
 326        LASSERT(req == NULL);
 327
 328        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
 329            (fid_is_sane(&op_data->op_fid1)) &&
 330            !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
 331                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
 332                                                &cancels, LCK_EX,
 333                                                MDS_INODELOCK_UPDATE);
 334        if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
 335            (fid_is_sane(&op_data->op_fid3)) &&
 336            !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
 337                count += mdc_resource_get_unused(exp, &op_data->op_fid3,
 338                                                 &cancels, LCK_EX,
 339                                                 MDS_INODELOCK_FULL);
 340        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 341                                   &RQF_MDS_REINT_UNLINK);
 342        if (req == NULL) {
 343                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 344                return -ENOMEM;
 345        }
 346        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 347        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 348                             op_data->op_namelen + 1);
 349
 350        rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 351        if (rc) {
 352                ptlrpc_request_free(req);
 353                return rc;
 354        }
 355
 356        mdc_unlink_pack(req, op_data);
 357
 358        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 359                             obd->u.cli.cl_max_mds_easize);
 360        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
 361                             obd->u.cli.cl_max_mds_cookiesize);
 362        ptlrpc_request_set_replen(req);
 363
 364        *request = req;
 365
 366        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
 367        if (rc == -ERESTARTSYS)
 368                rc = 0;
 369        return rc;
 370}
 371
 372int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
 373             struct ptlrpc_request **request)
 374{
 375        LIST_HEAD(cancels);
 376        struct obd_device *obd = exp->exp_obd;
 377        struct ptlrpc_request *req;
 378        int count = 0, rc;
 379
 380        if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
 381            (fid_is_sane(&op_data->op_fid2)))
 382                count = mdc_resource_get_unused(exp, &op_data->op_fid2,
 383                                                &cancels, LCK_EX,
 384                                                MDS_INODELOCK_UPDATE);
 385        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
 386            (fid_is_sane(&op_data->op_fid1)))
 387                count += mdc_resource_get_unused(exp, &op_data->op_fid1,
 388                                                 &cancels, LCK_EX,
 389                                                 MDS_INODELOCK_UPDATE);
 390
 391        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
 392        if (req == NULL) {
 393                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 394                return -ENOMEM;
 395        }
 396        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 397        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
 398        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 399                             op_data->op_namelen + 1);
 400
 401        rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 402        if (rc) {
 403                ptlrpc_request_free(req);
 404                return rc;
 405        }
 406
 407        mdc_link_pack(req, op_data);
 408        ptlrpc_request_set_replen(req);
 409
 410        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
 411        *request = req;
 412        if (rc == -ERESTARTSYS)
 413                rc = 0;
 414
 415        return rc;
 416}
 417
 418int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
 419               const char *old, int oldlen, const char *new, int newlen,
 420               struct ptlrpc_request **request)
 421{
 422        LIST_HEAD(cancels);
 423        struct obd_device *obd = exp->exp_obd;
 424        struct ptlrpc_request *req;
 425        int count = 0, rc;
 426
 427        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
 428            (fid_is_sane(&op_data->op_fid1)))
 429                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
 430                                                &cancels, LCK_EX,
 431                                                MDS_INODELOCK_UPDATE);
 432        if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
 433            (fid_is_sane(&op_data->op_fid2)))
 434                count += mdc_resource_get_unused(exp, &op_data->op_fid2,
 435                                                 &cancels, LCK_EX,
 436                                                 MDS_INODELOCK_UPDATE);
 437        if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
 438            (fid_is_sane(&op_data->op_fid3)))
 439                count += mdc_resource_get_unused(exp, &op_data->op_fid3,
 440                                                 &cancels, LCK_EX,
 441                                                 MDS_INODELOCK_LOOKUP);
 442        if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
 443             (fid_is_sane(&op_data->op_fid4)))
 444                count += mdc_resource_get_unused(exp, &op_data->op_fid4,
 445                                                 &cancels, LCK_EX,
 446                                                 MDS_INODELOCK_FULL);
 447
 448        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 449                                   &RQF_MDS_REINT_RENAME);
 450        if (req == NULL) {
 451                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 452                return -ENOMEM;
 453        }
 454
 455        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 456        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
 457        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
 458        req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
 459
 460        rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 461        if (rc) {
 462                ptlrpc_request_free(req);
 463                return rc;
 464        }
 465
 466        if (exp_connect_cancelset(exp) && req)
 467                ldlm_cli_cancel_list(&cancels, count, req, 0);
 468
 469        mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
 470
 471        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 472                             obd->u.cli.cl_max_mds_easize);
 473        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
 474                             obd->u.cli.cl_max_mds_cookiesize);
 475        ptlrpc_request_set_replen(req);
 476
 477        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
 478        *request = req;
 479        if (rc == -ERESTARTSYS)
 480                rc = 0;
 481
 482        return rc;
 483}
 484