linux/drivers/staging/lustre/lustre/mdc/mdc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_MDC
  38
  39# include <linux/module.h>
  40# include <linux/pagemap.h>
  41# include <linux/miscdevice.h>
  42# include <linux/init.h>
  43# include <linux/utsname.h>
  44
  45#include <lustre_acl.h>
  46#include <obd_class.h>
  47#include <lustre_fid.h>
  48#include <lprocfs_status.h>
  49#include <lustre_param.h>
  50#include <lustre_log.h>
  51
  52#include "mdc_internal.h"
  53
  54#define REQUEST_MINOR 244
  55
  56struct mdc_renew_capa_args {
  57        struct obd_capa *ra_oc;
  58        renew_capa_cb_t  ra_cb;
  59};
  60
  61static int mdc_cleanup(struct obd_device *obd);
  62
  63int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
  64                    const struct req_msg_field *field, struct obd_capa **oc)
  65{
  66        struct lustre_capa *capa;
  67        struct obd_capa *c;
  68
  69        /* swabbed already in mdc_enqueue */
  70        capa = req_capsule_server_get(&req->rq_pill, field);
  71        if (capa == NULL)
  72                return -EPROTO;
  73
  74        c = alloc_capa(CAPA_SITE_CLIENT);
  75        if (IS_ERR(c)) {
  76                CDEBUG(D_INFO, "alloc capa failed!\n");
  77                return PTR_ERR(c);
  78        } else {
  79                c->c_capa = *capa;
  80                *oc = c;
  81                return 0;
  82        }
  83}
  84
  85static inline int mdc_queue_wait(struct ptlrpc_request *req)
  86{
  87        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
  88        int rc;
  89
  90        /* mdc_enter_request() ensures that this client has no more
  91         * than cl_max_rpcs_in_flight RPCs simultaneously inf light
  92         * against an MDT. */
  93        rc = mdc_enter_request(cli);
  94        if (rc != 0)
  95                return rc;
  96
  97        rc = ptlrpc_queue_wait(req);
  98        mdc_exit_request(cli);
  99
 100        return rc;
 101}
 102
 103/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 104/* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 105static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
 106                          struct obd_capa **pc, int level, int msg_flags)
 107{
 108        struct ptlrpc_request *req;
 109        struct mdt_body       *body;
 110        int                 rc;
 111
 112        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
 113                                        LUSTRE_MDS_VERSION, MDS_GETSTATUS);
 114        if (req == NULL)
 115                return -ENOMEM;
 116
 117        mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
 118        lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
 119        req->rq_send_state = level;
 120
 121        ptlrpc_request_set_replen(req);
 122
 123        rc = ptlrpc_queue_wait(req);
 124        if (rc)
 125                GOTO(out, rc);
 126
 127        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 128        if (body == NULL)
 129                GOTO(out, rc = -EPROTO);
 130
 131        if (body->valid & OBD_MD_FLMDSCAPA) {
 132                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc);
 133                if (rc)
 134                        GOTO(out, rc);
 135        }
 136
 137        *rootfid = body->fid1;
 138        CDEBUG(D_NET,
 139               "root fid="DFID", last_committed="LPU64"\n",
 140               PFID(rootfid),
 141               lustre_msg_get_last_committed(req->rq_repmsg));
 142out:
 143        ptlrpc_req_finished(req);
 144        return rc;
 145}
 146
 147/* This should be mdc_get_info("rootfid") */
 148int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
 149                  struct obd_capa **pc)
 150{
 151        return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
 152                              LUSTRE_IMP_FULL, 0);
 153}
 154
 155/*
 156 * This function now is known to always saying that it will receive 4 buffers
 157 * from server. Even for cases when acl_size and md_size is zero, RPC header
 158 * will contain 4 fields and RPC itself will contain zero size fields. This is
 159 * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
 160 * and thus zero, it shrinks it, making zero size. The same story about
 161 * md_size. And this is course of problem when client waits for smaller number
 162 * of fields. This issue will be fixed later when client gets aware of RPC
 163 * layouts.  --umka
 164 */
 165static int mdc_getattr_common(struct obd_export *exp,
 166                              struct ptlrpc_request *req)
 167{
 168        struct req_capsule *pill = &req->rq_pill;
 169        struct mdt_body    *body;
 170        void           *eadata;
 171        int              rc;
 172
 173        /* Request message already built. */
 174        rc = ptlrpc_queue_wait(req);
 175        if (rc != 0)
 176                return rc;
 177
 178        /* sanity check for the reply */
 179        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 180        if (body == NULL)
 181                return -EPROTO;
 182
 183        CDEBUG(D_NET, "mode: %o\n", body->mode);
 184
 185        if (body->eadatasize != 0) {
 186                mdc_update_max_ea_from_body(exp, body);
 187
 188                eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 189                                                      body->eadatasize);
 190                if (eadata == NULL)
 191                        return -EPROTO;
 192        }
 193
 194        if (body->valid & OBD_MD_FLRMTPERM) {
 195                struct mdt_remote_perm *perm;
 196
 197                LASSERT(client_is_remote(exp));
 198                perm = req_capsule_server_swab_get(pill, &RMF_ACL,
 199                                                lustre_swab_mdt_remote_perm);
 200                if (perm == NULL)
 201                        return -EPROTO;
 202        }
 203
 204        if (body->valid & OBD_MD_FLMDSCAPA) {
 205                struct lustre_capa *capa;
 206                capa = req_capsule_server_get(pill, &RMF_CAPA1);
 207                if (capa == NULL)
 208                        return -EPROTO;
 209        }
 210
 211        return 0;
 212}
 213
 214int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
 215                struct ptlrpc_request **request)
 216{
 217        struct ptlrpc_request *req;
 218        int                 rc;
 219
 220        /* Single MDS without an LMV case */
 221        if (op_data->op_flags & MF_GET_MDT_IDX) {
 222                op_data->op_mds = 0;
 223                return 0;
 224        }
 225        *request = NULL;
 226        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
 227        if (req == NULL)
 228                return -ENOMEM;
 229
 230        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 231
 232        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
 233        if (rc) {
 234                ptlrpc_request_free(req);
 235                return rc;
 236        }
 237
 238        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
 239                      op_data->op_valid, op_data->op_mode, -1, 0);
 240
 241        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 242                             op_data->op_mode);
 243        if (op_data->op_valid & OBD_MD_FLRMTPERM) {
 244                LASSERT(client_is_remote(exp));
 245                req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
 246                                     sizeof(struct mdt_remote_perm));
 247        }
 248        ptlrpc_request_set_replen(req);
 249
 250        rc = mdc_getattr_common(exp, req);
 251        if (rc)
 252                ptlrpc_req_finished(req);
 253        else
 254                *request = req;
 255        return rc;
 256}
 257
 258int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 259                     struct ptlrpc_request **request)
 260{
 261        struct ptlrpc_request *req;
 262        int                 rc;
 263
 264        *request = NULL;
 265        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 266                                   &RQF_MDS_GETATTR_NAME);
 267        if (req == NULL)
 268                return -ENOMEM;
 269
 270        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 271        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 272                             op_data->op_namelen + 1);
 273
 274        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
 275        if (rc) {
 276                ptlrpc_request_free(req);
 277                return rc;
 278        }
 279
 280        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
 281                      op_data->op_valid, op_data->op_mode,
 282                      op_data->op_suppgids[0], 0);
 283
 284        if (op_data->op_name) {
 285                char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 286                LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
 287                                op_data->op_namelen);
 288                memcpy(name, op_data->op_name, op_data->op_namelen);
 289        }
 290
 291        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 292                             op_data->op_mode);
 293        ptlrpc_request_set_replen(req);
 294
 295        rc = mdc_getattr_common(exp, req);
 296        if (rc)
 297                ptlrpc_req_finished(req);
 298        else
 299                *request = req;
 300        return rc;
 301}
 302
 303static int mdc_is_subdir(struct obd_export *exp,
 304                         const struct lu_fid *pfid,
 305                         const struct lu_fid *cfid,
 306                         struct ptlrpc_request **request)
 307{
 308        struct ptlrpc_request  *req;
 309        int                  rc;
 310
 311        *request = NULL;
 312        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 313                                        &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
 314                                        MDS_IS_SUBDIR);
 315        if (req == NULL)
 316                return -ENOMEM;
 317
 318        mdc_is_subdir_pack(req, pfid, cfid, 0);
 319        ptlrpc_request_set_replen(req);
 320
 321        rc = ptlrpc_queue_wait(req);
 322        if (rc && rc != -EREMOTE)
 323                ptlrpc_req_finished(req);
 324        else
 325                *request = req;
 326        return rc;
 327}
 328
 329static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 330                            const struct lu_fid *fid,
 331                            struct obd_capa *oc, int opcode, obd_valid valid,
 332                            const char *xattr_name, const char *input,
 333                            int input_size, int output_size, int flags,
 334                            __u32 suppgid, struct ptlrpc_request **request)
 335{
 336        struct ptlrpc_request *req;
 337        int   xattr_namelen = 0;
 338        char *tmp;
 339        int   rc;
 340
 341        *request = NULL;
 342        req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
 343        if (req == NULL)
 344                return -ENOMEM;
 345
 346        mdc_set_capa_size(req, &RMF_CAPA1, oc);
 347        if (xattr_name) {
 348                xattr_namelen = strlen(xattr_name) + 1;
 349                req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 350                                     xattr_namelen);
 351        }
 352        if (input_size) {
 353                LASSERT(input);
 354                req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
 355                                     input_size);
 356        }
 357
 358        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
 359        if (rc) {
 360                ptlrpc_request_free(req);
 361                return rc;
 362        }
 363
 364        if (opcode == MDS_REINT) {
 365                struct mdt_rec_setxattr *rec;
 366
 367                CLASSERT(sizeof(struct mdt_rec_setxattr) ==
 368                         sizeof(struct mdt_rec_reint));
 369                rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 370                rec->sx_opcode = REINT_SETXATTR;
 371                rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
 372                rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
 373                rec->sx_cap    = cfs_curproc_cap_pack();
 374                rec->sx_suppgid1 = suppgid;
 375                rec->sx_suppgid2 = -1;
 376                rec->sx_fid    = *fid;
 377                rec->sx_valid  = valid | OBD_MD_FLCTIME;
 378                rec->sx_time   = cfs_time_current_sec();
 379                rec->sx_size   = output_size;
 380                rec->sx_flags  = flags;
 381
 382                mdc_pack_capa(req, &RMF_CAPA1, oc);
 383        } else {
 384                mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
 385        }
 386
 387        if (xattr_name) {
 388                tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 389                memcpy(tmp, xattr_name, xattr_namelen);
 390        }
 391        if (input_size) {
 392                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
 393                memcpy(tmp, input, input_size);
 394        }
 395
 396        if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
 397                req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
 398                                     RCL_SERVER, output_size);
 399        ptlrpc_request_set_replen(req);
 400
 401        /* make rpc */
 402        if (opcode == MDS_REINT)
 403                mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 404
 405        rc = ptlrpc_queue_wait(req);
 406
 407        if (opcode == MDS_REINT)
 408                mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 409
 410        if (rc)
 411                ptlrpc_req_finished(req);
 412        else
 413                *request = req;
 414        return rc;
 415}
 416
 417int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
 418                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
 419                 const char *input, int input_size, int output_size,
 420                 int flags, __u32 suppgid, struct ptlrpc_request **request)
 421{
 422        return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
 423                                fid, oc, MDS_REINT, valid, xattr_name,
 424                                input, input_size, output_size, flags,
 425                                suppgid, request);
 426}
 427
 428int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
 429                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
 430                 const char *input, int input_size, int output_size,
 431                 int flags, struct ptlrpc_request **request)
 432{
 433        return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
 434                                fid, oc, MDS_GETXATTR, valid, xattr_name,
 435                                input, input_size, output_size, flags,
 436                                -1, request);
 437}
 438
 439#ifdef CONFIG_FS_POSIX_ACL
 440static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
 441{
 442        struct req_capsule     *pill = &req->rq_pill;
 443        struct mdt_body *body = md->body;
 444        struct posix_acl       *acl;
 445        void               *buf;
 446        int                  rc;
 447
 448        if (!body->aclsize)
 449                return 0;
 450
 451        buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize);
 452
 453        if (!buf)
 454                return -EPROTO;
 455
 456        acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize);
 457        if (IS_ERR(acl)) {
 458                rc = PTR_ERR(acl);
 459                CERROR("convert xattr to acl: %d\n", rc);
 460                return rc;
 461        }
 462
 463        rc = posix_acl_valid(acl);
 464        if (rc) {
 465                CERROR("validate acl: %d\n", rc);
 466                posix_acl_release(acl);
 467                return rc;
 468        }
 469
 470        md->posix_acl = acl;
 471        return 0;
 472}
 473#else
 474#define mdc_unpack_acl(req, md) 0
 475#endif
 476
 477int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
 478                      struct obd_export *dt_exp, struct obd_export *md_exp,
 479                      struct lustre_md *md)
 480{
 481        struct req_capsule *pill = &req->rq_pill;
 482        int rc;
 483
 484        LASSERT(md);
 485        memset(md, 0, sizeof(*md));
 486
 487        md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 488        LASSERT(md->body != NULL);
 489
 490        if (md->body->valid & OBD_MD_FLEASIZE) {
 491                int lmmsize;
 492                struct lov_mds_md *lmm;
 493
 494                if (!S_ISREG(md->body->mode)) {
 495                        CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a "
 496                               "regular file, but is not\n");
 497                        GOTO(out, rc = -EPROTO);
 498                }
 499
 500                if (md->body->eadatasize == 0) {
 501                        CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, "
 502                               "but eadatasize 0\n");
 503                        GOTO(out, rc = -EPROTO);
 504                }
 505                lmmsize = md->body->eadatasize;
 506                lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
 507                if (!lmm)
 508                        GOTO(out, rc = -EPROTO);
 509
 510                rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
 511                if (rc < 0)
 512                        GOTO(out, rc);
 513
 514                if (rc < sizeof(*md->lsm)) {
 515                        CDEBUG(D_INFO, "lsm size too small: "
 516                               "rc < sizeof (*md->lsm) (%d < %d)\n",
 517                               rc, (int)sizeof(*md->lsm));
 518                        GOTO(out, rc = -EPROTO);
 519                }
 520
 521        } else if (md->body->valid & OBD_MD_FLDIREA) {
 522                int lmvsize;
 523                struct lov_mds_md *lmv;
 524
 525                if(!S_ISDIR(md->body->mode)) {
 526                        CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a "
 527                               "directory, but is not\n");
 528                        GOTO(out, rc = -EPROTO);
 529                }
 530
 531                if (md->body->eadatasize == 0) {
 532                        CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, "
 533                               "but eadatasize 0\n");
 534                        return -EPROTO;
 535                }
 536                if (md->body->valid & OBD_MD_MEA) {
 537                        lmvsize = md->body->eadatasize;
 538                        lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 539                                                           lmvsize);
 540                        if (!lmv)
 541                                GOTO(out, rc = -EPROTO);
 542
 543                        rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
 544                                          lmvsize);
 545                        if (rc < 0)
 546                                GOTO(out, rc);
 547
 548                        if (rc < sizeof(*md->mea)) {
 549                                CDEBUG(D_INFO, "size too small:  "
 550                                       "rc < sizeof(*md->mea) (%d < %d)\n",
 551                                        rc, (int)sizeof(*md->mea));
 552                                GOTO(out, rc = -EPROTO);
 553                        }
 554                }
 555        }
 556        rc = 0;
 557
 558        if (md->body->valid & OBD_MD_FLRMTPERM) {
 559                /* remote permission */
 560                LASSERT(client_is_remote(exp));
 561                md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL,
 562                                                lustre_swab_mdt_remote_perm);
 563                if (!md->remote_perm)
 564                        GOTO(out, rc = -EPROTO);
 565        }
 566        else if (md->body->valid & OBD_MD_FLACL) {
 567                /* for ACL, it's possible that FLACL is set but aclsize is zero.
 568                 * only when aclsize != 0 there's an actual segment for ACL
 569                 * in reply buffer.
 570                 */
 571                if (md->body->aclsize) {
 572                        rc = mdc_unpack_acl(req, md);
 573                        if (rc)
 574                                GOTO(out, rc);
 575#ifdef CONFIG_FS_POSIX_ACL
 576                } else {
 577                        md->posix_acl = NULL;
 578#endif
 579                }
 580        }
 581        if (md->body->valid & OBD_MD_FLMDSCAPA) {
 582                struct obd_capa *oc = NULL;
 583
 584                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc);
 585                if (rc)
 586                        GOTO(out, rc);
 587                md->mds_capa = oc;
 588        }
 589
 590        if (md->body->valid & OBD_MD_FLOSSCAPA) {
 591                struct obd_capa *oc = NULL;
 592
 593                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc);
 594                if (rc)
 595                        GOTO(out, rc);
 596                md->oss_capa = oc;
 597        }
 598
 599out:
 600        if (rc) {
 601                if (md->oss_capa) {
 602                        capa_put(md->oss_capa);
 603                        md->oss_capa = NULL;
 604                }
 605                if (md->mds_capa) {
 606                        capa_put(md->mds_capa);
 607                        md->mds_capa = NULL;
 608                }
 609#ifdef CONFIG_FS_POSIX_ACL
 610                posix_acl_release(md->posix_acl);
 611#endif
 612                if (md->lsm)
 613                        obd_free_memmd(dt_exp, &md->lsm);
 614        }
 615        return rc;
 616}
 617
 618int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 619{
 620        return 0;
 621}
 622
 623/**
 624 * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
 625 * RPC chains.
 626 */
 627void mdc_replay_open(struct ptlrpc_request *req)
 628{
 629        struct md_open_data *mod = req->rq_cb_data;
 630        struct ptlrpc_request *close_req;
 631        struct obd_client_handle *och;
 632        struct lustre_handle old;
 633        struct mdt_body *body;
 634
 635        if (mod == NULL) {
 636                DEBUG_REQ(D_ERROR, req,
 637                          "Can't properly replay without open data.");
 638                return;
 639        }
 640
 641        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 642        LASSERT(body != NULL);
 643
 644        och = mod->mod_och;
 645        if (och != NULL) {
 646                struct lustre_handle *file_fh;
 647
 648                LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
 649
 650                file_fh = &och->och_fh;
 651                CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
 652                       file_fh->cookie, body->handle.cookie);
 653                old = *file_fh;
 654                *file_fh = body->handle;
 655        }
 656        close_req = mod->mod_close_req;
 657        if (close_req != NULL) {
 658                __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
 659                struct mdt_ioepoch *epoch;
 660
 661                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
 662                epoch = req_capsule_client_get(&close_req->rq_pill,
 663                                               &RMF_MDT_EPOCH);
 664                LASSERT(epoch);
 665
 666                if (och != NULL)
 667                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
 668                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
 669                epoch->handle = body->handle;
 670        }
 671}
 672
 673void mdc_commit_open(struct ptlrpc_request *req)
 674{
 675        struct md_open_data *mod = req->rq_cb_data;
 676        if (mod == NULL)
 677                return;
 678
 679        /**
 680         * No need to touch md_open_data::mod_och, it holds a reference on
 681         * \var mod and will zero references to each other, \var mod will be
 682         * freed after that when md_open_data::mod_och will put the reference.
 683         */
 684
 685        /**
 686         * Do not let open request to disappear as it still may be needed
 687         * for close rpc to happen (it may happen on evict only, otherwise
 688         * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
 689         * called), just mark this rpc as committed to distinguish these 2
 690         * cases, see mdc_close() for details. The open request reference will
 691         * be put along with freeing \var mod.
 692         */
 693        ptlrpc_request_addref(req);
 694        spin_lock(&req->rq_lock);
 695        req->rq_committed = 1;
 696        spin_unlock(&req->rq_lock);
 697        req->rq_cb_data = NULL;
 698        obd_mod_put(mod);
 699}
 700
 701int mdc_set_open_replay_data(struct obd_export *exp,
 702                             struct obd_client_handle *och,
 703                             struct ptlrpc_request *open_req)
 704{
 705        struct md_open_data   *mod;
 706        struct mdt_rec_create *rec;
 707        struct mdt_body       *body;
 708        struct obd_import     *imp = open_req->rq_import;
 709
 710        if (!open_req->rq_replay)
 711                return 0;
 712
 713        rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
 714        body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
 715        LASSERT(rec != NULL);
 716        /* Incoming message in my byte order (it's been swabbed). */
 717        /* Outgoing messages always in my byte order. */
 718        LASSERT(body != NULL);
 719
 720        /* Only if the import is replayable, we set replay_open data */
 721        if (och && imp->imp_replayable) {
 722                mod = obd_mod_alloc();
 723                if (mod == NULL) {
 724                        DEBUG_REQ(D_ERROR, open_req,
 725                                  "Can't allocate md_open_data");
 726                        return 0;
 727                }
 728
 729                /**
 730                 * Take a reference on \var mod, to be freed on mdc_close().
 731                 * It protects \var mod from being freed on eviction (commit
 732                 * callback is called despite rq_replay flag).
 733                 * Another reference for \var och.
 734                 */
 735                obd_mod_get(mod);
 736                obd_mod_get(mod);
 737
 738                spin_lock(&open_req->rq_lock);
 739                och->och_mod = mod;
 740                mod->mod_och = och;
 741                mod->mod_open_req = open_req;
 742                open_req->rq_cb_data = mod;
 743                open_req->rq_commit_cb = mdc_commit_open;
 744                spin_unlock(&open_req->rq_lock);
 745        }
 746
 747        rec->cr_fid2 = body->fid1;
 748        rec->cr_ioepoch = body->ioepoch;
 749        rec->cr_old_handle.cookie = body->handle.cookie;
 750        open_req->rq_replay_cb = mdc_replay_open;
 751        if (!fid_is_sane(&body->fid1)) {
 752                DEBUG_REQ(D_ERROR, open_req, "Saving replay request with "
 753                          "insane fid");
 754                LBUG();
 755        }
 756
 757        DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
 758        return 0;
 759}
 760
 761int mdc_clear_open_replay_data(struct obd_export *exp,
 762                               struct obd_client_handle *och)
 763{
 764        struct md_open_data *mod = och->och_mod;
 765
 766        /**
 767         * It is possible to not have \var mod in a case of eviction between
 768         * lookup and ll_file_open().
 769         **/
 770        if (mod == NULL)
 771                return 0;
 772
 773        LASSERT(mod != LP_POISON);
 774
 775        mod->mod_och = NULL;
 776        och->och_mod = NULL;
 777        obd_mod_put(mod);
 778
 779        return 0;
 780}
 781
 782/* Prepares the request for the replay by the given reply */
 783static void mdc_close_handle_reply(struct ptlrpc_request *req,
 784                                   struct md_op_data *op_data, int rc) {
 785        struct mdt_body  *repbody;
 786        struct mdt_ioepoch *epoch;
 787
 788        if (req && rc == -EAGAIN) {
 789                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 790                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
 791
 792                epoch->flags |= MF_SOM_AU;
 793                if (repbody->valid & OBD_MD_FLGETATTRLOCK)
 794                        op_data->op_flags |= MF_GETATTR_LOCK;
 795        }
 796}
 797
 798int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 799              struct md_open_data *mod, struct ptlrpc_request **request)
 800{
 801        struct obd_device     *obd = class_exp2obd(exp);
 802        struct ptlrpc_request *req;
 803        struct req_format     *req_fmt;
 804        int                    rc;
 805        int                    saved_rc = 0;
 806
 807
 808        req_fmt = &RQF_MDS_CLOSE;
 809        if (op_data->op_bias & MDS_HSM_RELEASE) {
 810                req_fmt = &RQF_MDS_RELEASE_CLOSE;
 811
 812                /* allocate a FID for volatile file */
 813                rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
 814                if (rc < 0) {
 815                        CERROR("%s: "DFID" failed to allocate FID: %d\n",
 816                               obd->obd_name, PFID(&op_data->op_fid1), rc);
 817                        /* save the errcode and proceed to close */
 818                        saved_rc = rc;
 819                }
 820        }
 821
 822        *request = NULL;
 823        req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
 824        if (req == NULL)
 825                return -ENOMEM;
 826
 827        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 828
 829        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
 830        if (rc) {
 831                ptlrpc_request_free(req);
 832                return rc;
 833        }
 834
 835        /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
 836         * portal whose threads are not taking any DLM locks and are therefore
 837         * always progressing */
 838        req->rq_request_portal = MDS_READPAGE_PORTAL;
 839        ptlrpc_at_set_req_timeout(req);
 840
 841        /* Ensure that this close's handle is fixed up during replay. */
 842        if (likely(mod != NULL)) {
 843                LASSERTF(mod->mod_open_req != NULL &&
 844                         mod->mod_open_req->rq_type != LI_POISON,
 845                         "POISONED open %p!\n", mod->mod_open_req);
 846
 847                mod->mod_close_req = req;
 848
 849                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
 850                /* We no longer want to preserve this open for replay even
 851                 * though the open was committed. b=3632, b=3633 */
 852                spin_lock(&mod->mod_open_req->rq_lock);
 853                mod->mod_open_req->rq_replay = 0;
 854                spin_unlock(&mod->mod_open_req->rq_lock);
 855        } else {
 856                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
 857        }
 858
 859        mdc_close_pack(req, op_data);
 860
 861        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 862                             obd->u.cli.cl_max_mds_easize);
 863        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
 864                             obd->u.cli.cl_max_mds_cookiesize);
 865
 866        ptlrpc_request_set_replen(req);
 867
 868        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 869        rc = ptlrpc_queue_wait(req);
 870        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 871
 872        if (req->rq_repmsg == NULL) {
 873                CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
 874                       req->rq_status);
 875                if (rc == 0)
 876                        rc = req->rq_status ?: -EIO;
 877        } else if (rc == 0 || rc == -EAGAIN) {
 878                struct mdt_body *body;
 879
 880                rc = lustre_msg_get_status(req->rq_repmsg);
 881                if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
 882                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
 883                                  "= %d", rc);
 884                        if (rc > 0)
 885                                rc = -rc;
 886                }
 887                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 888                if (body == NULL)
 889                        rc = -EPROTO;
 890        } else if (rc == -ESTALE) {
 891                /**
 892                 * it can be allowed error after 3633 if open was committed and
 893                 * server failed before close was sent. Let's check if mod
 894                 * exists and return no error in that case
 895                 */
 896                if (mod) {
 897                        DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
 898                        LASSERT(mod->mod_open_req != NULL);
 899                        if (mod->mod_open_req->rq_committed)
 900                                rc = 0;
 901                }
 902        }
 903
 904        if (mod) {
 905                if (rc != 0)
 906                        mod->mod_close_req = NULL;
 907                /* Since now, mod is accessed through open_req only,
 908                 * thus close req does not keep a reference on mod anymore. */
 909                obd_mod_put(mod);
 910        }
 911        *request = req;
 912        mdc_close_handle_reply(req, op_data, rc);
 913        return rc < 0 ? rc : saved_rc;
 914}
 915
 916int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
 917                     struct md_open_data *mod)
 918{
 919        struct obd_device     *obd = class_exp2obd(exp);
 920        struct ptlrpc_request *req;
 921        int                 rc;
 922
 923        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 924                                   &RQF_MDS_DONE_WRITING);
 925        if (req == NULL)
 926                return -ENOMEM;
 927
 928        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 929        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
 930        if (rc) {
 931                ptlrpc_request_free(req);
 932                return rc;
 933        }
 934
 935        if (mod != NULL) {
 936                LASSERTF(mod->mod_open_req != NULL &&
 937                         mod->mod_open_req->rq_type != LI_POISON,
 938                         "POISONED setattr %p!\n", mod->mod_open_req);
 939
 940                mod->mod_close_req = req;
 941                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
 942                /* We no longer want to preserve this setattr for replay even
 943                 * though the open was committed. b=3632, b=3633 */
 944                spin_lock(&mod->mod_open_req->rq_lock);
 945                mod->mod_open_req->rq_replay = 0;
 946                spin_unlock(&mod->mod_open_req->rq_lock);
 947        }
 948
 949        mdc_close_pack(req, op_data);
 950        ptlrpc_request_set_replen(req);
 951
 952        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 953        rc = ptlrpc_queue_wait(req);
 954        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 955
 956        if (rc == -ESTALE) {
 957                /**
 958                 * it can be allowed error after 3633 if open or setattr were
 959                 * committed and server failed before close was sent.
 960                 * Let's check if mod exists and return no error in that case
 961                 */
 962                if (mod) {
 963                        LASSERT(mod->mod_open_req != NULL);
 964                        if (mod->mod_open_req->rq_committed)
 965                                rc = 0;
 966                }
 967        }
 968
 969        if (mod) {
 970                if (rc != 0)
 971                        mod->mod_close_req = NULL;
 972                /* Since now, mod is accessed through setattr req only,
 973                 * thus DW req does not keep a reference on mod anymore. */
 974                obd_mod_put(mod);
 975        }
 976
 977        mdc_close_handle_reply(req, op_data, rc);
 978        ptlrpc_req_finished(req);
 979        return rc;
 980}
 981
 982
 983int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data,
 984                 struct page **pages, struct ptlrpc_request **request)
 985{
 986        struct ptlrpc_request   *req;
 987        struct ptlrpc_bulk_desc *desc;
 988        int                   i;
 989        wait_queue_head_t             waitq;
 990        int                   resends = 0;
 991        struct l_wait_info       lwi;
 992        int                   rc;
 993
 994        *request = NULL;
 995        init_waitqueue_head(&waitq);
 996
 997restart_bulk:
 998        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
 999        if (req == NULL)
1000                return -ENOMEM;
1001
1002        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1003
1004        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
1005        if (rc) {
1006                ptlrpc_request_free(req);
1007                return rc;
1008        }
1009
1010        req->rq_request_portal = MDS_READPAGE_PORTAL;
1011        ptlrpc_at_set_req_timeout(req);
1012
1013        desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
1014                                    MDS_BULK_PORTAL);
1015        if (desc == NULL) {
1016                ptlrpc_request_free(req);
1017                return -ENOMEM;
1018        }
1019
1020        /* NB req now owns desc and will free it when it gets freed */
1021        for (i = 0; i < op_data->op_npages; i++)
1022                ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1023
1024        mdc_readdir_pack(req, op_data->op_offset,
1025                         PAGE_CACHE_SIZE * op_data->op_npages,
1026                         &op_data->op_fid1, op_data->op_capa1);
1027
1028        ptlrpc_request_set_replen(req);
1029        rc = ptlrpc_queue_wait(req);
1030        if (rc) {
1031                ptlrpc_req_finished(req);
1032                if (rc != -ETIMEDOUT)
1033                        return rc;
1034
1035                resends++;
1036                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1037                        CERROR("too many resend retries, returning error\n");
1038                        return -EIO;
1039                }
1040                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1041                l_wait_event(waitq, 0, &lwi);
1042
1043                goto restart_bulk;
1044        }
1045
1046        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1047                                          req->rq_bulk->bd_nob_transferred);
1048        if (rc < 0) {
1049                ptlrpc_req_finished(req);
1050                return rc;
1051        }
1052
1053        if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1054                CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
1055                        req->rq_bulk->bd_nob_transferred,
1056                        PAGE_CACHE_SIZE * op_data->op_npages);
1057                ptlrpc_req_finished(req);
1058                return -EPROTO;
1059        }
1060
1061        *request = req;
1062        return 0;
1063}
1064
1065static int mdc_statfs(const struct lu_env *env,
1066                      struct obd_export *exp, struct obd_statfs *osfs,
1067                      __u64 max_age, __u32 flags)
1068{
1069        struct obd_device     *obd = class_exp2obd(exp);
1070        struct ptlrpc_request *req;
1071        struct obd_statfs     *msfs;
1072        struct obd_import     *imp = NULL;
1073        int                 rc;
1074
1075        /*
1076         * Since the request might also come from lprocfs, so we need
1077         * sync this with client_disconnect_export Bug15684
1078         */
1079        down_read(&obd->u.cli.cl_sem);
1080        if (obd->u.cli.cl_import)
1081                imp = class_import_get(obd->u.cli.cl_import);
1082        up_read(&obd->u.cli.cl_sem);
1083        if (!imp)
1084                return -ENODEV;
1085
1086        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1087                                        LUSTRE_MDS_VERSION, MDS_STATFS);
1088        if (req == NULL)
1089                GOTO(output, rc = -ENOMEM);
1090
1091        ptlrpc_request_set_replen(req);
1092
1093        if (flags & OBD_STATFS_NODELAY) {
1094                /* procfs requests not want stay in wait for avoid deadlock */
1095                req->rq_no_resend = 1;
1096                req->rq_no_delay = 1;
1097        }
1098
1099        rc = ptlrpc_queue_wait(req);
1100        if (rc) {
1101                /* check connection error first */
1102                if (imp->imp_connect_error)
1103                        rc = imp->imp_connect_error;
1104                GOTO(out, rc);
1105        }
1106
1107        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1108        if (msfs == NULL)
1109                GOTO(out, rc = -EPROTO);
1110
1111        *osfs = *msfs;
1112out:
1113        ptlrpc_req_finished(req);
1114output:
1115        class_import_put(imp);
1116        return rc;
1117}
1118
1119static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1120{
1121        __u32 keylen, vallen;
1122        void *key;
1123        int rc;
1124
1125        if (gf->gf_pathlen > PATH_MAX)
1126                return -ENAMETOOLONG;
1127        if (gf->gf_pathlen < 2)
1128                return -EOVERFLOW;
1129
1130        /* Key is KEY_FID2PATH + getinfo_fid2path description */
1131        keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1132        OBD_ALLOC(key, keylen);
1133        if (key == NULL)
1134                return -ENOMEM;
1135        memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1136        memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1137
1138        CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
1139               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1140
1141        if (!fid_is_sane(&gf->gf_fid))
1142                GOTO(out, rc = -EINVAL);
1143
1144        /* Val is struct getinfo_fid2path result plus path */
1145        vallen = sizeof(*gf) + gf->gf_pathlen;
1146
1147        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
1148        if (rc != 0 && rc != -EREMOTE)
1149                GOTO(out, rc);
1150
1151        if (vallen <= sizeof(*gf))
1152                GOTO(out, rc = -EPROTO);
1153        else if (vallen > sizeof(*gf) + gf->gf_pathlen)
1154                GOTO(out, rc = -EOVERFLOW);
1155
1156        CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n%s\n",
1157               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1158
1159out:
1160        OBD_FREE(key, keylen);
1161        return rc;
1162}
1163
1164static int mdc_ioc_hsm_progress(struct obd_export *exp,
1165                                struct hsm_progress_kernel *hpk)
1166{
1167        struct obd_import               *imp = class_exp2cliimp(exp);
1168        struct hsm_progress_kernel      *req_hpk;
1169        struct ptlrpc_request           *req;
1170        int                              rc;
1171
1172        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1173                                        LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1174        if (req == NULL)
1175                GOTO(out, rc = -ENOMEM);
1176
1177        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1178
1179        /* Copy hsm_progress struct */
1180        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1181        if (req_hpk == NULL)
1182                GOTO(out, rc = -EPROTO);
1183
1184        *req_hpk = *hpk;
1185        req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
1186
1187        ptlrpc_request_set_replen(req);
1188
1189        rc = mdc_queue_wait(req);
1190        GOTO(out, rc);
1191out:
1192        ptlrpc_req_finished(req);
1193        return rc;
1194}
1195
1196static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
1197{
1198        __u32                   *archive_mask;
1199        struct ptlrpc_request   *req;
1200        int                      rc;
1201
1202        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER,
1203                                        LUSTRE_MDS_VERSION,
1204                                        MDS_HSM_CT_REGISTER);
1205        if (req == NULL)
1206                GOTO(out, rc = -ENOMEM);
1207
1208        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1209
1210        /* Copy hsm_progress struct */
1211        archive_mask = req_capsule_client_get(&req->rq_pill,
1212                                              &RMF_MDS_HSM_ARCHIVE);
1213        if (archive_mask == NULL)
1214                GOTO(out, rc = -EPROTO);
1215
1216        *archive_mask = archives;
1217
1218        ptlrpc_request_set_replen(req);
1219
1220        rc = mdc_queue_wait(req);
1221        GOTO(out, rc);
1222out:
1223        ptlrpc_req_finished(req);
1224        return rc;
1225}
1226
1227static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1228                                      struct md_op_data *op_data)
1229{
1230        struct hsm_current_action       *hca = op_data->op_data;
1231        struct hsm_current_action       *req_hca;
1232        struct ptlrpc_request           *req;
1233        int                              rc;
1234
1235        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1236                                   &RQF_MDS_HSM_ACTION);
1237        if (req == NULL)
1238                return -ENOMEM;
1239
1240        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1241
1242        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1243        if (rc) {
1244                ptlrpc_request_free(req);
1245                return rc;
1246        }
1247
1248        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1249                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1250
1251        ptlrpc_request_set_replen(req);
1252
1253        rc = mdc_queue_wait(req);
1254        if (rc)
1255                GOTO(out, rc);
1256
1257        req_hca = req_capsule_server_get(&req->rq_pill,
1258                                         &RMF_MDS_HSM_CURRENT_ACTION);
1259        if (req_hca == NULL)
1260                GOTO(out, rc = -EPROTO);
1261
1262        *hca = *req_hca;
1263
1264out:
1265        ptlrpc_req_finished(req);
1266        return rc;
1267}
1268
1269static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1270{
1271        struct ptlrpc_request   *req;
1272        int                      rc;
1273
1274        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1275                                        LUSTRE_MDS_VERSION,
1276                                        MDS_HSM_CT_UNREGISTER);
1277        if (req == NULL)
1278                GOTO(out, rc = -ENOMEM);
1279
1280        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1281
1282        ptlrpc_request_set_replen(req);
1283
1284        rc = mdc_queue_wait(req);
1285        GOTO(out, rc);
1286out:
1287        ptlrpc_req_finished(req);
1288        return rc;
1289}
1290
1291static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1292                                 struct md_op_data *op_data)
1293{
1294        struct hsm_user_state   *hus = op_data->op_data;
1295        struct hsm_user_state   *req_hus;
1296        struct ptlrpc_request   *req;
1297        int                      rc;
1298
1299        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1300                                   &RQF_MDS_HSM_STATE_GET);
1301        if (req == NULL)
1302                return -ENOMEM;
1303
1304        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1305
1306        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1307        if (rc != 0) {
1308                ptlrpc_request_free(req);
1309                return rc;
1310        }
1311
1312        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1313                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1314
1315        ptlrpc_request_set_replen(req);
1316
1317        rc = mdc_queue_wait(req);
1318        if (rc)
1319                GOTO(out, rc);
1320
1321        req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1322        if (req_hus == NULL)
1323                GOTO(out, rc = -EPROTO);
1324
1325        *hus = *req_hus;
1326
1327out:
1328        ptlrpc_req_finished(req);
1329        return rc;
1330}
1331
1332static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1333                                 struct md_op_data *op_data)
1334{
1335        struct hsm_state_set    *hss = op_data->op_data;
1336        struct hsm_state_set    *req_hss;
1337        struct ptlrpc_request   *req;
1338        int                      rc;
1339
1340        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1341                                   &RQF_MDS_HSM_STATE_SET);
1342        if (req == NULL)
1343                return -ENOMEM;
1344
1345        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1346
1347        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1348        if (rc) {
1349                ptlrpc_request_free(req);
1350                return rc;
1351        }
1352
1353        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1354                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1355
1356        /* Copy states */
1357        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1358        if (req_hss == NULL)
1359                GOTO(out, rc = -EPROTO);
1360        *req_hss = *hss;
1361
1362        ptlrpc_request_set_replen(req);
1363
1364        rc = mdc_queue_wait(req);
1365        GOTO(out, rc);
1366
1367out:
1368        ptlrpc_req_finished(req);
1369        return rc;
1370}
1371
1372static int mdc_ioc_hsm_request(struct obd_export *exp,
1373                               struct hsm_user_request *hur)
1374{
1375        struct obd_import       *imp = class_exp2cliimp(exp);
1376        struct ptlrpc_request   *req;
1377        struct hsm_request      *req_hr;
1378        struct hsm_user_item    *req_hui;
1379        char                    *req_opaque;
1380        int                      rc;
1381
1382        req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
1383        if (req == NULL)
1384                GOTO(out, rc = -ENOMEM);
1385
1386        req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
1387                             hur->hur_request.hr_itemcount
1388                             * sizeof(struct hsm_user_item));
1389        req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
1390                             hur->hur_request.hr_data_len);
1391
1392        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
1393        if (rc) {
1394                ptlrpc_request_free(req);
1395                return rc;
1396        }
1397
1398        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1399
1400        /* Copy hsm_request struct */
1401        req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
1402        if (req_hr == NULL)
1403                GOTO(out, rc = -EPROTO);
1404        *req_hr = hur->hur_request;
1405
1406        /* Copy hsm_user_item structs */
1407        req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
1408        if (req_hui == NULL)
1409                GOTO(out, rc = -EPROTO);
1410        memcpy(req_hui, hur->hur_user_item,
1411               hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
1412
1413        /* Copy opaque field */
1414        req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
1415        if (req_opaque == NULL)
1416                GOTO(out, rc = -EPROTO);
1417        memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
1418
1419        ptlrpc_request_set_replen(req);
1420
1421        rc = mdc_queue_wait(req);
1422        GOTO(out, rc);
1423
1424out:
1425        ptlrpc_req_finished(req);
1426        return rc;
1427}
1428
1429static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
1430{
1431        struct kuc_hdr *lh = (struct kuc_hdr *)buf;
1432
1433        LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE);
1434
1435        lh->kuc_magic = KUC_MAGIC;
1436        lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
1437        lh->kuc_flags = flags;
1438        lh->kuc_msgtype = CL_RECORD;
1439        lh->kuc_msglen = len;
1440        return lh;
1441}
1442
1443#define D_CHANGELOG 0
1444
1445struct changelog_show {
1446        __u64           cs_startrec;
1447        __u32           cs_flags;
1448        struct file     *cs_fp;
1449        char            *cs_buf;
1450        struct obd_device *cs_obd;
1451};
1452
1453static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
1454                             struct llog_rec_hdr *hdr, void *data)
1455{
1456        struct changelog_show *cs = data;
1457        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
1458        struct kuc_hdr *lh;
1459        int len, rc;
1460
1461        if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
1462                rc = -EINVAL;
1463                CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
1464                       cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
1465                       rec->cr.cr_type, rc);
1466                return rc;
1467        }
1468
1469        if (rec->cr.cr_index < cs->cs_startrec) {
1470                /* Skip entries earlier than what we are interested in */
1471                CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
1472                       rec->cr.cr_index, cs->cs_startrec);
1473                return 0;
1474        }
1475
1476        CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
1477                " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
1478                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
1479                rec->cr.cr_flags & CLF_FLAGMASK,
1480                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
1481                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
1482
1483        len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
1484
1485        /* Set up the message */
1486        lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
1487        memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
1488
1489        rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
1490        CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc);
1491
1492        return rc;
1493}
1494
1495static int mdc_changelog_send_thread(void *csdata)
1496{
1497        struct changelog_show *cs = csdata;
1498        struct llog_ctxt *ctxt = NULL;
1499        struct llog_handle *llh = NULL;
1500        struct kuc_hdr *kuch;
1501        int rc;
1502
1503        CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n",
1504               cs->cs_fp, cs->cs_startrec);
1505
1506        OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
1507        if (cs->cs_buf == NULL)
1508                GOTO(out, rc = -ENOMEM);
1509
1510        /* Set up the remote catalog handle */
1511        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
1512        if (ctxt == NULL)
1513                GOTO(out, rc = -ENOENT);
1514        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
1515                       LLOG_OPEN_EXISTS);
1516        if (rc) {
1517                CERROR("%s: fail to open changelog catalog: rc = %d\n",
1518                       cs->cs_obd->obd_name, rc);
1519                GOTO(out, rc);
1520        }
1521        rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
1522        if (rc) {
1523                CERROR("llog_init_handle failed %d\n", rc);
1524                GOTO(out, rc);
1525        }
1526
1527        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
1528
1529        /* Send EOF no matter what our result */
1530        if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
1531                                      cs->cs_flags))) {
1532                kuch->kuc_msgtype = CL_EOF;
1533                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
1534        }
1535
1536out:
1537        fput(cs->cs_fp);
1538        if (llh)
1539                llog_cat_close(NULL, llh);
1540        if (ctxt)
1541                llog_ctxt_put(ctxt);
1542        if (cs->cs_buf)
1543                OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
1544        OBD_FREE_PTR(cs);
1545        return rc;
1546}
1547
1548static int mdc_ioc_changelog_send(struct obd_device *obd,
1549                                  struct ioc_changelog *icc)
1550{
1551        struct changelog_show *cs;
1552        int rc;
1553
1554        /* Freed in mdc_changelog_send_thread */
1555        OBD_ALLOC_PTR(cs);
1556        if (!cs)
1557                return -ENOMEM;
1558
1559        cs->cs_obd = obd;
1560        cs->cs_startrec = icc->icc_recno;
1561        /* matching fput in mdc_changelog_send_thread */
1562        cs->cs_fp = fget(icc->icc_id);
1563        cs->cs_flags = icc->icc_flags;
1564
1565        /*
1566         * New thread because we should return to user app before
1567         * writing into our pipe
1568         */
1569        rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs,
1570                                 "mdc_clg_send_thread"));
1571        if (!IS_ERR_VALUE(rc)) {
1572                CDEBUG(D_CHANGELOG, "start changelog thread\n");
1573                return 0;
1574        }
1575
1576        CERROR("Failed to start changelog thread: %d\n", rc);
1577        OBD_FREE_PTR(cs);
1578        return rc;
1579}
1580
1581static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1582                                struct lustre_kernelcomm *lk);
1583
1584static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
1585                          struct obd_quotactl *oqctl)
1586{
1587        struct client_obd       *cli = &exp->exp_obd->u.cli;
1588        struct ptlrpc_request   *req;
1589        struct obd_quotactl     *body;
1590        int                   rc;
1591
1592        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1593                                        &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
1594                                        MDS_QUOTACHECK);
1595        if (req == NULL)
1596                return -ENOMEM;
1597
1598        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1599        *body = *oqctl;
1600
1601        ptlrpc_request_set_replen(req);
1602
1603        /* the next poll will find -ENODATA, that means quotacheck is
1604         * going on */
1605        cli->cl_qchk_stat = -ENODATA;
1606        rc = ptlrpc_queue_wait(req);
1607        if (rc)
1608                cli->cl_qchk_stat = rc;
1609        ptlrpc_req_finished(req);
1610        return rc;
1611}
1612
1613static int mdc_quota_poll_check(struct obd_export *exp,
1614                                struct if_quotacheck *qchk)
1615{
1616        struct client_obd *cli = &exp->exp_obd->u.cli;
1617        int rc;
1618
1619        qchk->obd_uuid = cli->cl_target_uuid;
1620        memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
1621
1622        rc = cli->cl_qchk_stat;
1623        /* the client is not the previous one */
1624        if (rc == CL_NOT_QUOTACHECKED)
1625                rc = -EINTR;
1626        return rc;
1627}
1628
1629static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
1630                        struct obd_quotactl *oqctl)
1631{
1632        struct ptlrpc_request   *req;
1633        struct obd_quotactl     *oqc;
1634        int                   rc;
1635
1636        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1637                                        &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
1638                                        MDS_QUOTACTL);
1639        if (req == NULL)
1640                return -ENOMEM;
1641
1642        oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1643        *oqc = *oqctl;
1644
1645        ptlrpc_request_set_replen(req);
1646        ptlrpc_at_set_req_timeout(req);
1647        req->rq_no_resend = 1;
1648
1649        rc = ptlrpc_queue_wait(req);
1650        if (rc)
1651                CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
1652
1653        if (req->rq_repmsg &&
1654            (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
1655                *oqctl = *oqc;
1656        } else if (!rc) {
1657                CERROR ("Can't unpack obd_quotactl\n");
1658                rc = -EPROTO;
1659        }
1660        ptlrpc_req_finished(req);
1661
1662        return rc;
1663}
1664
1665static int mdc_ioc_swap_layouts(struct obd_export *exp,
1666                                struct md_op_data *op_data)
1667{
1668        LIST_HEAD(cancels);
1669        struct ptlrpc_request   *req;
1670        int                      rc, count;
1671        struct mdc_swap_layouts *msl, *payload;
1672
1673        msl = op_data->op_data;
1674
1675        /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
1676         * first thing it will do is to cancel the 2 layout
1677         * locks hold by this client.
1678         * So the client must cancel its layout locks on the 2 fids
1679         * with the request RPC to avoid extra RPC round trips
1680         */
1681        count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
1682                                        LCK_CR, MDS_INODELOCK_LAYOUT);
1683        count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
1684                                         LCK_CR, MDS_INODELOCK_LAYOUT);
1685
1686        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1687                                   &RQF_MDS_SWAP_LAYOUTS);
1688        if (req == NULL) {
1689                ldlm_lock_list_put(&cancels, l_bl_ast, count);
1690                return -ENOMEM;
1691        }
1692
1693        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1694        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
1695
1696        rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
1697        if (rc) {
1698                ptlrpc_request_free(req);
1699                return rc;
1700        }
1701
1702        mdc_swap_layouts_pack(req, op_data);
1703
1704        payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
1705        LASSERT(payload);
1706
1707        *payload = *msl;
1708
1709        ptlrpc_request_set_replen(req);
1710
1711        rc = ptlrpc_queue_wait(req);
1712        if (rc)
1713                GOTO(out, rc);
1714
1715out:
1716        ptlrpc_req_finished(req);
1717        return rc;
1718}
1719
1720static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1721                         void *karg, void *uarg)
1722{
1723        struct obd_device *obd = exp->exp_obd;
1724        struct obd_ioctl_data *data = karg;
1725        struct obd_import *imp = obd->u.cli.cl_import;
1726        struct llog_ctxt *ctxt;
1727        int rc;
1728
1729        if (!try_module_get(THIS_MODULE)) {
1730                CERROR("Can't get module. Is it alive?");
1731                return -EINVAL;
1732        }
1733        switch (cmd) {
1734        case OBD_IOC_CHANGELOG_SEND:
1735                rc = mdc_ioc_changelog_send(obd, karg);
1736                GOTO(out, rc);
1737        case OBD_IOC_CHANGELOG_CLEAR: {
1738                struct ioc_changelog *icc = karg;
1739                struct changelog_setinfo cs =
1740                        {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id};
1741                rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
1742                                        KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
1743                                        NULL);
1744                GOTO(out, rc);
1745        }
1746        case OBD_IOC_FID2PATH:
1747                rc = mdc_ioc_fid2path(exp, karg);
1748                GOTO(out, rc);
1749        case LL_IOC_HSM_CT_START:
1750                rc = mdc_ioc_hsm_ct_start(exp, karg);
1751                /* ignore if it was already registered on this MDS. */
1752                if (rc == -EEXIST)
1753                        rc = 0;
1754                GOTO(out, rc);
1755        case LL_IOC_HSM_PROGRESS:
1756                rc = mdc_ioc_hsm_progress(exp, karg);
1757                GOTO(out, rc);
1758        case LL_IOC_HSM_STATE_GET:
1759                rc = mdc_ioc_hsm_state_get(exp, karg);
1760                GOTO(out, rc);
1761        case LL_IOC_HSM_STATE_SET:
1762                rc = mdc_ioc_hsm_state_set(exp, karg);
1763                GOTO(out, rc);
1764        case LL_IOC_HSM_ACTION:
1765                rc = mdc_ioc_hsm_current_action(exp, karg);
1766                GOTO(out, rc);
1767        case LL_IOC_HSM_REQUEST:
1768                rc = mdc_ioc_hsm_request(exp, karg);
1769                GOTO(out, rc);
1770        case OBD_IOC_CLIENT_RECOVER:
1771                rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
1772                if (rc < 0)
1773                        GOTO(out, rc);
1774                GOTO(out, rc = 0);
1775        case IOC_OSC_SET_ACTIVE:
1776                rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1777                GOTO(out, rc);
1778        case OBD_IOC_PARSE: {
1779                ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
1780                rc = class_config_parse_llog(NULL, ctxt, data->ioc_inlbuf1,
1781                                             NULL);
1782                llog_ctxt_put(ctxt);
1783                GOTO(out, rc);
1784        }
1785        case OBD_IOC_LLOG_INFO:
1786        case OBD_IOC_LLOG_PRINT: {
1787                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1788                rc = llog_ioctl(NULL, ctxt, cmd, data);
1789                llog_ctxt_put(ctxt);
1790                GOTO(out, rc);
1791        }
1792        case OBD_IOC_POLL_QUOTACHECK:
1793                rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
1794                GOTO(out, rc);
1795        case OBD_IOC_PING_TARGET:
1796                rc = ptlrpc_obd_ping(obd);
1797                GOTO(out, rc);
1798        /*
1799         * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
1800         * LMV instead of MDC. But when the cluster is upgraded from 1.8,
1801         * there'd be no LMV layer thus we might be called here. Eventually
1802         * this code should be removed.
1803         * bz20731, LU-592.
1804         */
1805        case IOC_OBD_STATFS: {
1806                struct obd_statfs stat_buf = {0};
1807
1808                if (*((__u32 *) data->ioc_inlbuf2) != 0)
1809                        GOTO(out, rc = -ENODEV);
1810
1811                /* copy UUID */
1812                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
1813                                     min((int) data->ioc_plen2,
1814                                         (int) sizeof(struct obd_uuid))))
1815                        GOTO(out, rc = -EFAULT);
1816
1817                rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
1818                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1819                                0);
1820                if (rc != 0)
1821                        GOTO(out, rc);
1822
1823                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1824                                     min((int) data->ioc_plen1,
1825                                         (int) sizeof(stat_buf))))
1826                        GOTO(out, rc = -EFAULT);
1827
1828                GOTO(out, rc = 0);
1829        }
1830        case OBD_IOC_QUOTACTL: {
1831                struct if_quotactl *qctl = karg;
1832                struct obd_quotactl *oqctl;
1833
1834                OBD_ALLOC_PTR(oqctl);
1835                if (oqctl == NULL)
1836                        GOTO(out, rc = -ENOMEM);
1837
1838                QCTL_COPY(oqctl, qctl);
1839                rc = obd_quotactl(exp, oqctl);
1840                if (rc == 0) {
1841                        QCTL_COPY(qctl, oqctl);
1842                        qctl->qc_valid = QC_MDTIDX;
1843                        qctl->obd_uuid = obd->u.cli.cl_target_uuid;
1844                }
1845
1846                OBD_FREE_PTR(oqctl);
1847                GOTO(out, rc);
1848        }
1849        case LL_IOC_GET_CONNECT_FLAGS:
1850                if (copy_to_user(uarg, exp_connect_flags_ptr(exp),
1851                                 sizeof(*exp_connect_flags_ptr(exp))))
1852                        GOTO(out, rc = -EFAULT);
1853
1854                GOTO(out, rc = 0);
1855        case LL_IOC_LOV_SWAP_LAYOUTS:
1856                rc = mdc_ioc_swap_layouts(exp, karg);
1857                GOTO(out, rc);
1858        default:
1859                CERROR("unrecognised ioctl: cmd = %#x\n", cmd);
1860                GOTO(out, rc = -ENOTTY);
1861        }
1862out:
1863        module_put(THIS_MODULE);
1864
1865        return rc;
1866}
1867
1868int mdc_get_info_rpc(struct obd_export *exp,
1869                     obd_count keylen, void *key,
1870                     int vallen, void *val)
1871{
1872        struct obd_import      *imp = class_exp2cliimp(exp);
1873        struct ptlrpc_request  *req;
1874        char               *tmp;
1875        int                  rc = -EINVAL;
1876
1877        req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
1878        if (req == NULL)
1879                return -ENOMEM;
1880
1881        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
1882                             RCL_CLIENT, keylen);
1883        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
1884                             RCL_CLIENT, sizeof(__u32));
1885
1886        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
1887        if (rc) {
1888                ptlrpc_request_free(req);
1889                return rc;
1890        }
1891
1892        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
1893        memcpy(tmp, key, keylen);
1894        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
1895        memcpy(tmp, &vallen, sizeof(__u32));
1896
1897        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
1898                             RCL_SERVER, vallen);
1899        ptlrpc_request_set_replen(req);
1900
1901        rc = ptlrpc_queue_wait(req);
1902        /* -EREMOTE means the get_info result is partial, and it needs to
1903         * continue on another MDT, see fid2path part in lmv_iocontrol */
1904        if (rc == 0 || rc == -EREMOTE) {
1905                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
1906                memcpy(val, tmp, vallen);
1907                if (ptlrpc_rep_need_swab(req)) {
1908                        if (KEY_IS(KEY_FID2PATH))
1909                                lustre_swab_fid2path(val);
1910                }
1911        }
1912        ptlrpc_req_finished(req);
1913
1914        return rc;
1915}
1916
1917static void lustre_swab_hai(struct hsm_action_item *h)
1918{
1919        __swab32s(&h->hai_len);
1920        __swab32s(&h->hai_action);
1921        lustre_swab_lu_fid(&h->hai_fid);
1922        lustre_swab_lu_fid(&h->hai_dfid);
1923        __swab64s(&h->hai_cookie);
1924        __swab64s(&h->hai_extent.offset);
1925        __swab64s(&h->hai_extent.length);
1926        __swab64s(&h->hai_gid);
1927}
1928
1929static void lustre_swab_hal(struct hsm_action_list *h)
1930{
1931        struct hsm_action_item  *hai;
1932        int                      i;
1933
1934        __swab32s(&h->hal_version);
1935        __swab32s(&h->hal_count);
1936        __swab32s(&h->hal_archive_id);
1937        __swab64s(&h->hal_flags);
1938        hai = hai_zero(h);
1939        for (i = 0; i < h->hal_count; i++, hai = hai_next(hai))
1940                lustre_swab_hai(hai);
1941}
1942
1943static void lustre_swab_kuch(struct kuc_hdr *l)
1944{
1945        __swab16s(&l->kuc_magic);
1946        /* __u8 l->kuc_transport */
1947        __swab16s(&l->kuc_msgtype);
1948        __swab16s(&l->kuc_msglen);
1949}
1950
1951static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1952                                struct lustre_kernelcomm *lk)
1953{
1954        struct obd_import  *imp = class_exp2cliimp(exp);
1955        __u32               archive = lk->lk_data;
1956        int                 rc = 0;
1957
1958        if (lk->lk_group != KUC_GRP_HSM) {
1959                CERROR("Bad copytool group %d\n", lk->lk_group);
1960                return -EINVAL;
1961        }
1962
1963        CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
1964               lk->lk_uid, lk->lk_group, lk->lk_flags);
1965
1966        if (lk->lk_flags & LK_FLG_STOP) {
1967                /* Unregister with the coordinator */
1968                rc = mdc_ioc_hsm_ct_unregister(imp);
1969        } else {
1970                rc = mdc_ioc_hsm_ct_register(imp, archive);
1971        }
1972
1973        return rc;
1974}
1975
1976/**
1977 * Send a message to any listening copytools
1978 * @param val KUC message (kuc_hdr + hsm_action_list)
1979 * @param len total length of message
1980 */
1981static int mdc_hsm_copytool_send(int len, void *val)
1982{
1983        struct kuc_hdr          *lh = (struct kuc_hdr *)val;
1984        struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
1985        int                      rc;
1986
1987        if (len < sizeof(*lh) + sizeof(*hal)) {
1988                CERROR("Short HSM message %d < %d\n", len,
1989                       (int) (sizeof(*lh) + sizeof(*hal)));
1990                return -EPROTO;
1991        }
1992        if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
1993                lustre_swab_kuch(lh);
1994                lustre_swab_hal(hal);
1995        } else if (lh->kuc_magic != KUC_MAGIC) {
1996                CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
1997                return -EPROTO;
1998        }
1999
2000        CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d "
2001               "on %s\n",
2002               lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2003               lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2004
2005        /* Broadcast to HSM listeners */
2006        rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh);
2007
2008        return rc;
2009}
2010
2011/**
2012 * callback function passed to kuc for re-registering each HSM copytool
2013 * running on MDC, after MDT shutdown/recovery.
2014 * @param data archive id served by the copytool
2015 * @param cb_arg callback argument (obd_import)
2016 */
2017static int mdc_hsm_ct_reregister(__u32 data, void *cb_arg)
2018{
2019        struct obd_import       *imp = (struct obd_import *)cb_arg;
2020        __u32                    archive = data;
2021        int                      rc;
2022
2023        CDEBUG(D_HA, "recover copytool registration to MDT (archive=%#x)\n",
2024               archive);
2025        rc = mdc_ioc_hsm_ct_register(imp, archive);
2026
2027        /* ignore error if the copytool is already registered */
2028        return ((rc != 0) && (rc != -EEXIST)) ? rc : 0;
2029}
2030
2031/**
2032 * Re-establish all kuc contexts with MDT
2033 * after MDT shutdown/recovery.
2034 */
2035static int mdc_kuc_reregister(struct obd_import *imp)
2036{
2037        /* re-register HSM agents */
2038        return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister,
2039                                         (void *)imp);
2040}
2041
2042int mdc_set_info_async(const struct lu_env *env,
2043                       struct obd_export *exp,
2044                       obd_count keylen, void *key,
2045                       obd_count vallen, void *val,
2046                       struct ptlrpc_request_set *set)
2047{
2048        struct obd_import       *imp = class_exp2cliimp(exp);
2049        int                      rc;
2050
2051        if (KEY_IS(KEY_READ_ONLY)) {
2052                if (vallen != sizeof(int))
2053                        return -EINVAL;
2054
2055                spin_lock(&imp->imp_lock);
2056                if (*((int *)val)) {
2057                        imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2058                        imp->imp_connect_data.ocd_connect_flags |=
2059                                                        OBD_CONNECT_RDONLY;
2060                } else {
2061                        imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2062                        imp->imp_connect_data.ocd_connect_flags &=
2063                                                        ~OBD_CONNECT_RDONLY;
2064                }
2065                spin_unlock(&imp->imp_lock);
2066
2067                rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2068                                       keylen, key, vallen, val, set);
2069                return rc;
2070        }
2071        if (KEY_IS(KEY_SPTLRPC_CONF)) {
2072                sptlrpc_conf_client_adapt(exp->exp_obd);
2073                return 0;
2074        }
2075        if (KEY_IS(KEY_FLUSH_CTX)) {
2076                sptlrpc_import_flush_my_ctx(imp);
2077                return 0;
2078        }
2079        if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2080                rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2081                                       keylen, key, vallen, val, set);
2082                return rc;
2083        }
2084        if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2085                rc = mdc_hsm_copytool_send(vallen, val);
2086                return rc;
2087        }
2088
2089        CERROR("Unknown key %s\n", (char *)key);
2090        return -EINVAL;
2091}
2092
2093int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2094                 __u32 keylen, void *key, __u32 *vallen, void *val,
2095                 struct lov_stripe_md *lsm)
2096{
2097        int rc = -EINVAL;
2098
2099        if (KEY_IS(KEY_MAX_EASIZE)) {
2100                int mdsize, *max_easize;
2101
2102                if (*vallen != sizeof(int))
2103                        return -EINVAL;
2104                mdsize = *(int*)val;
2105                if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2106                        exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2107                max_easize = val;
2108                *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2109                return 0;
2110        } else if (KEY_IS(KEY_CONN_DATA)) {
2111                struct obd_import *imp = class_exp2cliimp(exp);
2112                struct obd_connect_data *data = val;
2113
2114                if (*vallen != sizeof(*data))
2115                        return -EINVAL;
2116
2117                *data = imp->imp_connect_data;
2118                return 0;
2119        } else if (KEY_IS(KEY_TGT_COUNT)) {
2120                *((int *)val) = 1;
2121                return 0;
2122        }
2123
2124        rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2125
2126        return rc;
2127}
2128
2129static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
2130                   struct obd_capa *oc, struct obd_client_handle *handle,
2131                   int flags)
2132{
2133        struct ptlrpc_request *req;
2134        struct mdt_body       *body;
2135        int                 rc;
2136
2137        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_PIN);
2138        if (req == NULL)
2139                return -ENOMEM;
2140
2141        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2142
2143        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_PIN);
2144        if (rc) {
2145                ptlrpc_request_free(req);
2146                return rc;
2147        }
2148
2149        mdc_pack_body(req, fid, oc, 0, 0, -1, flags);
2150
2151        ptlrpc_request_set_replen(req);
2152
2153        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2154        rc = ptlrpc_queue_wait(req);
2155        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2156        if (rc) {
2157                CERROR("Pin failed: %d\n", rc);
2158                GOTO(err_out, rc);
2159        }
2160
2161        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2162        if (body == NULL)
2163                GOTO(err_out, rc = -EPROTO);
2164
2165        handle->och_fh = body->handle;
2166        handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
2167
2168        handle->och_mod = obd_mod_alloc();
2169        if (handle->och_mod == NULL) {
2170                DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
2171                GOTO(err_out, rc = -ENOMEM);
2172        }
2173        handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
2174
2175        return 0;
2176
2177err_out:
2178        ptlrpc_req_finished(req);
2179        return rc;
2180}
2181
2182static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle,
2183                     int flag)
2184{
2185        struct ptlrpc_request *req;
2186        struct mdt_body       *body;
2187        int                 rc;
2188
2189        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_UNPIN,
2190                                        LUSTRE_MDS_VERSION, MDS_UNPIN);
2191        if (req == NULL)
2192                return -ENOMEM;
2193
2194        body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
2195        body->handle = handle->och_fh;
2196        body->flags = flag;
2197
2198        ptlrpc_request_set_replen(req);
2199
2200        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2201        rc = ptlrpc_queue_wait(req);
2202        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2203
2204        if (rc != 0)
2205                CERROR("Unpin failed: %d\n", rc);
2206
2207        ptlrpc_req_finished(req);
2208        ptlrpc_req_finished(handle->och_mod->mod_open_req);
2209
2210        obd_mod_put(handle->och_mod);
2211        return rc;
2212}
2213
2214int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
2215             struct obd_capa *oc, struct ptlrpc_request **request)
2216{
2217        struct ptlrpc_request *req;
2218        int                 rc;
2219
2220        *request = NULL;
2221        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2222        if (req == NULL)
2223                return -ENOMEM;
2224
2225        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2226
2227        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2228        if (rc) {
2229                ptlrpc_request_free(req);
2230                return rc;
2231        }
2232
2233        mdc_pack_body(req, fid, oc, 0, 0, -1, 0);
2234
2235        ptlrpc_request_set_replen(req);
2236
2237        rc = ptlrpc_queue_wait(req);
2238        if (rc)
2239                ptlrpc_req_finished(req);
2240        else
2241                *request = req;
2242        return rc;
2243}
2244
2245static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2246                            enum obd_import_event event)
2247{
2248        int rc = 0;
2249
2250        LASSERT(imp->imp_obd == obd);
2251
2252        switch (event) {
2253        case IMP_EVENT_DISCON: {
2254#if 0
2255                /* XXX Pass event up to OBDs stack. used only for FLD now */
2256                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
2257#endif
2258                break;
2259        }
2260        case IMP_EVENT_INACTIVE: {
2261                struct client_obd *cli = &obd->u.cli;
2262                /*
2263                 * Flush current sequence to make client obtain new one
2264                 * from server in case of disconnect/reconnect.
2265                 */
2266                if (cli->cl_seq != NULL)
2267                        seq_client_flush(cli->cl_seq);
2268
2269                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2270                break;
2271        }
2272        case IMP_EVENT_INVALIDATE: {
2273                struct ldlm_namespace *ns = obd->obd_namespace;
2274
2275                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2276
2277                break;
2278        }
2279        case IMP_EVENT_ACTIVE:
2280                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2281                /* redo the kuc registration after reconnecting */
2282                if (rc == 0)
2283                        rc = mdc_kuc_reregister(imp);
2284                break;
2285        case IMP_EVENT_OCD:
2286                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2287                break;
2288        case IMP_EVENT_DEACTIVATE:
2289        case IMP_EVENT_ACTIVATE:
2290                break;
2291        default:
2292                CERROR("Unknown import event %x\n", event);
2293                LBUG();
2294        }
2295        return rc;
2296}
2297
2298int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
2299                  struct md_op_data *op_data)
2300{
2301        struct client_obd *cli = &exp->exp_obd->u.cli;
2302        struct lu_client_seq *seq = cli->cl_seq;
2303
2304        return seq_client_alloc_fid(NULL, seq, fid);
2305}
2306
2307struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
2308        struct client_obd *cli = &exp->exp_obd->u.cli;
2309        return &cli->cl_target_uuid;
2310}
2311
2312/**
2313 * Determine whether the lock can be canceled before replaying it during
2314 * recovery, non zero value will be return if the lock can be canceled,
2315 * or zero returned for not
2316 */
2317static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
2318{
2319        if (lock->l_resource->lr_type != LDLM_IBITS)
2320                return 0;
2321
2322        /* FIXME: if we ever get into a situation where there are too many
2323         * opened files with open locks on a single node, then we really
2324         * should replay these open locks to reget it */
2325        if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2326                return 0;
2327
2328        return 1;
2329}
2330
2331static int mdc_resource_inode_free(struct ldlm_resource *res)
2332{
2333        if (res->lr_lvb_inode)
2334                res->lr_lvb_inode = NULL;
2335
2336        return 0;
2337}
2338
2339struct ldlm_valblock_ops inode_lvbo = {
2340        .lvbo_free = mdc_resource_inode_free,
2341};
2342
2343static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2344{
2345        struct client_obd *cli = &obd->u.cli;
2346        struct lprocfs_static_vars lvars = { 0 };
2347        int rc;
2348
2349        OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2350        if (!cli->cl_rpc_lock)
2351                return -ENOMEM;
2352        mdc_init_rpc_lock(cli->cl_rpc_lock);
2353
2354        ptlrpcd_addref();
2355
2356        OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2357        if (!cli->cl_close_lock)
2358                GOTO(err_rpc_lock, rc = -ENOMEM);
2359        mdc_init_rpc_lock(cli->cl_close_lock);
2360
2361        rc = client_obd_setup(obd, cfg);
2362        if (rc)
2363                GOTO(err_close_lock, rc);
2364        lprocfs_mdc_init_vars(&lvars);
2365        lprocfs_obd_setup(obd, lvars.obd_vars);
2366        sptlrpc_lprocfs_cliobd_attach(obd);
2367        ptlrpc_lprocfs_register_obd(obd);
2368
2369        ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
2370
2371        obd->obd_namespace->ns_lvbo = &inode_lvbo;
2372
2373        rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
2374        if (rc) {
2375                mdc_cleanup(obd);
2376                CERROR("failed to setup llogging subsystems\n");
2377        }
2378
2379        return rc;
2380
2381err_close_lock:
2382        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2383err_rpc_lock:
2384        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2385        ptlrpcd_decref();
2386        return rc;
2387}
2388
2389/* Initialize the default and maximum LOV EA and cookie sizes.  This allows
2390 * us to make MDS RPCs with large enough reply buffers to hold the
2391 * maximum-sized (= maximum striped) EA and cookie without having to
2392 * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
2393static int mdc_init_ea_size(struct obd_export *exp, int easize,
2394                     int def_easize, int cookiesize)
2395{
2396        struct obd_device *obd = exp->exp_obd;
2397        struct client_obd *cli = &obd->u.cli;
2398
2399        if (cli->cl_max_mds_easize < easize)
2400                cli->cl_max_mds_easize = easize;
2401
2402        if (cli->cl_default_mds_easize < def_easize)
2403                cli->cl_default_mds_easize = def_easize;
2404
2405        if (cli->cl_max_mds_cookiesize < cookiesize)
2406                cli->cl_max_mds_cookiesize = cookiesize;
2407
2408        return 0;
2409}
2410
2411static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2412{
2413        int rc = 0;
2414
2415        switch (stage) {
2416        case OBD_CLEANUP_EARLY:
2417                break;
2418        case OBD_CLEANUP_EXPORTS:
2419                /* Failsafe, ok if racy */
2420                if (obd->obd_type->typ_refcnt <= 1)
2421                        libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
2422
2423                obd_cleanup_client_import(obd);
2424                ptlrpc_lprocfs_unregister_obd(obd);
2425                lprocfs_obd_cleanup(obd);
2426
2427                rc = obd_llog_finish(obd, 0);
2428                if (rc != 0)
2429                        CERROR("failed to cleanup llogging subsystems\n");
2430                break;
2431        }
2432        return rc;
2433}
2434
2435static int mdc_cleanup(struct obd_device *obd)
2436{
2437        struct client_obd *cli = &obd->u.cli;
2438
2439        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2440        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2441
2442        ptlrpcd_decref();
2443
2444        return client_obd_cleanup(obd);
2445}
2446
2447
2448static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
2449                         struct obd_device *tgt, int *index)
2450{
2451        struct llog_ctxt        *ctxt;
2452        int                      rc;
2453
2454        LASSERT(olg == &obd->obd_olg);
2455
2456        rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt,
2457                        &llog_client_ops);
2458        if (rc)
2459                return rc;
2460
2461        ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2462        llog_initiator_connect(ctxt);
2463        llog_ctxt_put(ctxt);
2464
2465        return 0;
2466}
2467
2468static int mdc_llog_finish(struct obd_device *obd, int count)
2469{
2470        struct llog_ctxt *ctxt;
2471
2472        ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2473        if (ctxt)
2474                llog_cleanup(NULL, ctxt);
2475
2476        return 0;
2477}
2478
2479static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
2480{
2481        struct lustre_cfg *lcfg = buf;
2482        struct lprocfs_static_vars lvars = { 0 };
2483        int rc = 0;
2484
2485        lprocfs_mdc_init_vars(&lvars);
2486        switch (lcfg->lcfg_command) {
2487        default:
2488                rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
2489                                              lcfg, obd);
2490                if (rc > 0)
2491                        rc = 0;
2492                break;
2493        }
2494        return(rc);
2495}
2496
2497
2498/* get remote permission for current user on fid */
2499int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2500                        struct obd_capa *oc, __u32 suppgid,
2501                        struct ptlrpc_request **request)
2502{
2503        struct ptlrpc_request  *req;
2504        int                 rc;
2505
2506        LASSERT(client_is_remote(exp));
2507
2508        *request = NULL;
2509        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
2510        if (req == NULL)
2511                return -ENOMEM;
2512
2513        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2514
2515        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
2516        if (rc) {
2517                ptlrpc_request_free(req);
2518                return rc;
2519        }
2520
2521        mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0);
2522
2523        req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
2524                             sizeof(struct mdt_remote_perm));
2525
2526        ptlrpc_request_set_replen(req);
2527
2528        rc = ptlrpc_queue_wait(req);
2529        if (rc)
2530                ptlrpc_req_finished(req);
2531        else
2532                *request = req;
2533        return rc;
2534}
2535
2536static int mdc_interpret_renew_capa(const struct lu_env *env,
2537                                    struct ptlrpc_request *req, void *args,
2538                                    int status)
2539{
2540        struct mdc_renew_capa_args *ra = args;
2541        struct mdt_body *body = NULL;
2542        struct lustre_capa *capa;
2543
2544        if (status)
2545                GOTO(out, capa = ERR_PTR(status));
2546
2547        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2548        if (body == NULL)
2549                GOTO(out, capa = ERR_PTR(-EFAULT));
2550
2551        if ((body->valid & OBD_MD_FLOSSCAPA) == 0)
2552                GOTO(out, capa = ERR_PTR(-ENOENT));
2553
2554        capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2);
2555        if (!capa)
2556                GOTO(out, capa = ERR_PTR(-EFAULT));
2557out:
2558        ra->ra_cb(ra->ra_oc, capa);
2559        return 0;
2560}
2561
2562static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2563                          renew_capa_cb_t cb)
2564{
2565        struct ptlrpc_request *req;
2566        struct mdc_renew_capa_args *ra;
2567
2568        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR,
2569                                        LUSTRE_MDS_VERSION, MDS_GETATTR);
2570        if (req == NULL)
2571                return -ENOMEM;
2572
2573        /* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the
2574         * capa to renew is oss capa.
2575         */
2576        mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0);
2577        ptlrpc_request_set_replen(req);
2578
2579        CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args));
2580        ra = ptlrpc_req_async_args(req);
2581        ra->ra_oc = oc;
2582        ra->ra_cb = cb;
2583        req->rq_interpret_reply = mdc_interpret_renew_capa;
2584        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2585        return 0;
2586}
2587
2588struct obd_ops mdc_obd_ops = {
2589        .o_owner            = THIS_MODULE,
2590        .o_setup            = mdc_setup,
2591        .o_precleanup       = mdc_precleanup,
2592        .o_cleanup        = mdc_cleanup,
2593        .o_add_conn      = client_import_add_conn,
2594        .o_del_conn      = client_import_del_conn,
2595        .o_connect          = client_connect_import,
2596        .o_disconnect       = client_disconnect_export,
2597        .o_iocontrol    = mdc_iocontrol,
2598        .o_set_info_async   = mdc_set_info_async,
2599        .o_statfs          = mdc_statfs,
2600        .o_pin        = mdc_pin,
2601        .o_unpin            = mdc_unpin,
2602        .o_fid_init         = client_fid_init,
2603        .o_fid_fini         = client_fid_fini,
2604        .o_fid_alloc    = mdc_fid_alloc,
2605        .o_import_event     = mdc_import_event,
2606        .o_llog_init    = mdc_llog_init,
2607        .o_llog_finish      = mdc_llog_finish,
2608        .o_get_info      = mdc_get_info,
2609        .o_process_config   = mdc_process_config,
2610        .o_get_uuid      = mdc_get_uuid,
2611        .o_quotactl      = mdc_quotactl,
2612        .o_quotacheck       = mdc_quotacheck
2613};
2614
2615struct md_ops mdc_md_ops = {
2616        .m_getstatus    = mdc_getstatus,
2617        .m_null_inode       = mdc_null_inode,
2618        .m_find_cbdata      = mdc_find_cbdata,
2619        .m_close            = mdc_close,
2620        .m_create          = mdc_create,
2621        .m_done_writing     = mdc_done_writing,
2622        .m_enqueue        = mdc_enqueue,
2623        .m_getattr        = mdc_getattr,
2624        .m_getattr_name     = mdc_getattr_name,
2625        .m_intent_lock      = mdc_intent_lock,
2626        .m_link      = mdc_link,
2627        .m_is_subdir    = mdc_is_subdir,
2628        .m_rename          = mdc_rename,
2629        .m_setattr        = mdc_setattr,
2630        .m_setxattr      = mdc_setxattr,
2631        .m_getxattr      = mdc_getxattr,
2632        .m_sync      = mdc_sync,
2633        .m_readpage      = mdc_readpage,
2634        .m_unlink          = mdc_unlink,
2635        .m_cancel_unused    = mdc_cancel_unused,
2636        .m_init_ea_size     = mdc_init_ea_size,
2637        .m_set_lock_data    = mdc_set_lock_data,
2638        .m_lock_match       = mdc_lock_match,
2639        .m_get_lustre_md    = mdc_get_lustre_md,
2640        .m_free_lustre_md   = mdc_free_lustre_md,
2641        .m_set_open_replay_data = mdc_set_open_replay_data,
2642        .m_clear_open_replay_data = mdc_clear_open_replay_data,
2643        .m_renew_capa       = mdc_renew_capa,
2644        .m_unpack_capa      = mdc_unpack_capa,
2645        .m_get_remote_perm  = mdc_get_remote_perm,
2646        .m_intent_getattr_async = mdc_intent_getattr_async,
2647        .m_revalidate_lock      = mdc_revalidate_lock
2648};
2649
2650int __init mdc_init(void)
2651{
2652        int rc;
2653        struct lprocfs_static_vars lvars = { 0 };
2654        lprocfs_mdc_init_vars(&lvars);
2655
2656        rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
2657                                 LUSTRE_MDC_NAME, NULL);
2658        return rc;
2659}
2660
2661static void /*__exit*/ mdc_exit(void)
2662{
2663        class_unregister_type(LUSTRE_MDC_NAME);
2664}
2665
2666MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2667MODULE_DESCRIPTION("Lustre Metadata Client");
2668MODULE_LICENSE("GPL");
2669
2670module_init(mdc_init);
2671module_exit(mdc_exit);
2672