linux/drivers/staging/lustre/lustre/mdc/mdc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_MDC
  38
  39# include <linux/module.h>
  40# include <linux/pagemap.h>
  41# include <linux/miscdevice.h>
  42# include <linux/init.h>
  43# include <linux/utsname.h>
  44
  45#include <lustre_acl.h>
  46#include <obd_class.h>
  47#include <lustre_fid.h>
  48#include <lprocfs_status.h>
  49#include <lustre_param.h>
  50#include <lustre_log.h>
  51
  52#include "mdc_internal.h"
  53
  54#define REQUEST_MINOR 244
  55
  56struct mdc_renew_capa_args {
  57        struct obd_capa *ra_oc;
  58        renew_capa_cb_t  ra_cb;
  59};
  60
  61static int mdc_cleanup(struct obd_device *obd);
  62
  63int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
  64                    const struct req_msg_field *field, struct obd_capa **oc)
  65{
  66        struct lustre_capa *capa;
  67        struct obd_capa *c;
  68        ENTRY;
  69
  70        /* swabbed already in mdc_enqueue */
  71        capa = req_capsule_server_get(&req->rq_pill, field);
  72        if (capa == NULL)
  73                RETURN(-EPROTO);
  74
  75        c = alloc_capa(CAPA_SITE_CLIENT);
  76        if (IS_ERR(c)) {
  77                CDEBUG(D_INFO, "alloc capa failed!\n");
  78                RETURN(PTR_ERR(c));
  79        } else {
  80                c->c_capa = *capa;
  81                *oc = c;
  82                RETURN(0);
  83        }
  84}
  85
  86static inline int mdc_queue_wait(struct ptlrpc_request *req)
  87{
  88        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
  89        int rc;
  90
  91        /* mdc_enter_request() ensures that this client has no more
  92         * than cl_max_rpcs_in_flight RPCs simultaneously inf light
  93         * against an MDT. */
  94        rc = mdc_enter_request(cli);
  95        if (rc != 0)
  96                return rc;
  97
  98        rc = ptlrpc_queue_wait(req);
  99        mdc_exit_request(cli);
 100
 101        return rc;
 102}
 103
 104/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 105/* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 106static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
 107                          struct obd_capa **pc, int level, int msg_flags)
 108{
 109        struct ptlrpc_request *req;
 110        struct mdt_body       *body;
 111        int                 rc;
 112        ENTRY;
 113
 114        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
 115                                        LUSTRE_MDS_VERSION, MDS_GETSTATUS);
 116        if (req == NULL)
 117                RETURN(-ENOMEM);
 118
 119        mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
 120        lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
 121        req->rq_send_state = level;
 122
 123        ptlrpc_request_set_replen(req);
 124
 125        rc = ptlrpc_queue_wait(req);
 126        if (rc)
 127                GOTO(out, rc);
 128
 129        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 130        if (body == NULL)
 131                GOTO(out, rc = -EPROTO);
 132
 133        if (body->valid & OBD_MD_FLMDSCAPA) {
 134                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc);
 135                if (rc)
 136                        GOTO(out, rc);
 137        }
 138
 139        *rootfid = body->fid1;
 140        CDEBUG(D_NET,
 141               "root fid="DFID", last_committed="LPU64"\n",
 142               PFID(rootfid),
 143               lustre_msg_get_last_committed(req->rq_repmsg));
 144        EXIT;
 145out:
 146        ptlrpc_req_finished(req);
 147        return rc;
 148}
 149
 150/* This should be mdc_get_info("rootfid") */
 151int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
 152                  struct obd_capa **pc)
 153{
 154        return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
 155                              LUSTRE_IMP_FULL, 0);
 156}
 157
 158/*
 159 * This function now is known to always saying that it will receive 4 buffers
 160 * from server. Even for cases when acl_size and md_size is zero, RPC header
 161 * will contain 4 fields and RPC itself will contain zero size fields. This is
 162 * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
 163 * and thus zero, it shrinks it, making zero size. The same story about
 164 * md_size. And this is course of problem when client waits for smaller number
 165 * of fields. This issue will be fixed later when client gets aware of RPC
 166 * layouts.  --umka
 167 */
 168static int mdc_getattr_common(struct obd_export *exp,
 169                              struct ptlrpc_request *req)
 170{
 171        struct req_capsule *pill = &req->rq_pill;
 172        struct mdt_body    *body;
 173        void           *eadata;
 174        int              rc;
 175        ENTRY;
 176
 177        /* Request message already built. */
 178        rc = ptlrpc_queue_wait(req);
 179        if (rc != 0)
 180                RETURN(rc);
 181
 182        /* sanity check for the reply */
 183        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 184        if (body == NULL)
 185                RETURN(-EPROTO);
 186
 187        CDEBUG(D_NET, "mode: %o\n", body->mode);
 188
 189        if (body->eadatasize != 0) {
 190                mdc_update_max_ea_from_body(exp, body);
 191
 192                eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 193                                                      body->eadatasize);
 194                if (eadata == NULL)
 195                        RETURN(-EPROTO);
 196        }
 197
 198        if (body->valid & OBD_MD_FLRMTPERM) {
 199                struct mdt_remote_perm *perm;
 200
 201                LASSERT(client_is_remote(exp));
 202                perm = req_capsule_server_swab_get(pill, &RMF_ACL,
 203                                                lustre_swab_mdt_remote_perm);
 204                if (perm == NULL)
 205                        RETURN(-EPROTO);
 206        }
 207
 208        if (body->valid & OBD_MD_FLMDSCAPA) {
 209                struct lustre_capa *capa;
 210                capa = req_capsule_server_get(pill, &RMF_CAPA1);
 211                if (capa == NULL)
 212                        RETURN(-EPROTO);
 213        }
 214
 215        RETURN(0);
 216}
 217
 218int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
 219                struct ptlrpc_request **request)
 220{
 221        struct ptlrpc_request *req;
 222        int                 rc;
 223        ENTRY;
 224
 225        /* Single MDS without an LMV case */
 226        if (op_data->op_flags & MF_GET_MDT_IDX) {
 227                op_data->op_mds = 0;
 228                RETURN(0);
 229        }
 230        *request = NULL;
 231        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
 232        if (req == NULL)
 233                RETURN(-ENOMEM);
 234
 235        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 236
 237        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
 238        if (rc) {
 239                ptlrpc_request_free(req);
 240                RETURN(rc);
 241        }
 242
 243        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
 244                      op_data->op_valid, op_data->op_mode, -1, 0);
 245
 246        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 247                             op_data->op_mode);
 248        if (op_data->op_valid & OBD_MD_FLRMTPERM) {
 249                LASSERT(client_is_remote(exp));
 250                req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
 251                                     sizeof(struct mdt_remote_perm));
 252        }
 253        ptlrpc_request_set_replen(req);
 254
 255        rc = mdc_getattr_common(exp, req);
 256        if (rc)
 257                ptlrpc_req_finished(req);
 258        else
 259                *request = req;
 260        RETURN(rc);
 261}
 262
 263int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 264                     struct ptlrpc_request **request)
 265{
 266        struct ptlrpc_request *req;
 267        int                 rc;
 268        ENTRY;
 269
 270        *request = NULL;
 271        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 272                                   &RQF_MDS_GETATTR_NAME);
 273        if (req == NULL)
 274                RETURN(-ENOMEM);
 275
 276        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 277        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 278                             op_data->op_namelen + 1);
 279
 280        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
 281        if (rc) {
 282                ptlrpc_request_free(req);
 283                RETURN(rc);
 284        }
 285
 286        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
 287                      op_data->op_valid, op_data->op_mode,
 288                      op_data->op_suppgids[0], 0);
 289
 290        if (op_data->op_name) {
 291                char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 292                LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
 293                                op_data->op_namelen);
 294                memcpy(name, op_data->op_name, op_data->op_namelen);
 295        }
 296
 297        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 298                             op_data->op_mode);
 299        ptlrpc_request_set_replen(req);
 300
 301        rc = mdc_getattr_common(exp, req);
 302        if (rc)
 303                ptlrpc_req_finished(req);
 304        else
 305                *request = req;
 306        RETURN(rc);
 307}
 308
 309static int mdc_is_subdir(struct obd_export *exp,
 310                         const struct lu_fid *pfid,
 311                         const struct lu_fid *cfid,
 312                         struct ptlrpc_request **request)
 313{
 314        struct ptlrpc_request  *req;
 315        int                  rc;
 316
 317        ENTRY;
 318
 319        *request = NULL;
 320        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 321                                        &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
 322                                        MDS_IS_SUBDIR);
 323        if (req == NULL)
 324                RETURN(-ENOMEM);
 325
 326        mdc_is_subdir_pack(req, pfid, cfid, 0);
 327        ptlrpc_request_set_replen(req);
 328
 329        rc = ptlrpc_queue_wait(req);
 330        if (rc && rc != -EREMOTE)
 331                ptlrpc_req_finished(req);
 332        else
 333                *request = req;
 334        RETURN(rc);
 335}
 336
 337static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 338                            const struct lu_fid *fid,
 339                            struct obd_capa *oc, int opcode, obd_valid valid,
 340                            const char *xattr_name, const char *input,
 341                            int input_size, int output_size, int flags,
 342                            __u32 suppgid, struct ptlrpc_request **request)
 343{
 344        struct ptlrpc_request *req;
 345        int   xattr_namelen = 0;
 346        char *tmp;
 347        int   rc;
 348        ENTRY;
 349
 350        *request = NULL;
 351        req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
 352        if (req == NULL)
 353                RETURN(-ENOMEM);
 354
 355        mdc_set_capa_size(req, &RMF_CAPA1, oc);
 356        if (xattr_name) {
 357                xattr_namelen = strlen(xattr_name) + 1;
 358                req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 359                                     xattr_namelen);
 360        }
 361        if (input_size) {
 362                LASSERT(input);
 363                req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
 364                                     input_size);
 365        }
 366
 367        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
 368        if (rc) {
 369                ptlrpc_request_free(req);
 370                RETURN(rc);
 371        }
 372
 373        if (opcode == MDS_REINT) {
 374                struct mdt_rec_setxattr *rec;
 375
 376                CLASSERT(sizeof(struct mdt_rec_setxattr) ==
 377                         sizeof(struct mdt_rec_reint));
 378                rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 379                rec->sx_opcode = REINT_SETXATTR;
 380                /* TODO:
 381                 *  cfs_curproc_fs{u,g}id() should replace
 382                 *  current->fs{u,g}id for portability.
 383                 */
 384                rec->sx_fsuid  = current_fsuid();
 385                rec->sx_fsgid  = current_fsgid();
 386                rec->sx_cap    = cfs_curproc_cap_pack();
 387                rec->sx_suppgid1 = suppgid;
 388                rec->sx_suppgid2 = -1;
 389                rec->sx_fid    = *fid;
 390                rec->sx_valid  = valid | OBD_MD_FLCTIME;
 391                rec->sx_time   = cfs_time_current_sec();
 392                rec->sx_size   = output_size;
 393                rec->sx_flags  = flags;
 394
 395                mdc_pack_capa(req, &RMF_CAPA1, oc);
 396        } else {
 397                mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
 398        }
 399
 400        if (xattr_name) {
 401                tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 402                memcpy(tmp, xattr_name, xattr_namelen);
 403        }
 404        if (input_size) {
 405                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
 406                memcpy(tmp, input, input_size);
 407        }
 408
 409        if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
 410                req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
 411                                     RCL_SERVER, output_size);
 412        ptlrpc_request_set_replen(req);
 413
 414        /* make rpc */
 415        if (opcode == MDS_REINT)
 416                mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 417
 418        rc = ptlrpc_queue_wait(req);
 419
 420        if (opcode == MDS_REINT)
 421                mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 422
 423        if (rc)
 424                ptlrpc_req_finished(req);
 425        else
 426                *request = req;
 427        RETURN(rc);
 428}
 429
 430int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
 431                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
 432                 const char *input, int input_size, int output_size,
 433                 int flags, __u32 suppgid, struct ptlrpc_request **request)
 434{
 435        return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
 436                                fid, oc, MDS_REINT, valid, xattr_name,
 437                                input, input_size, output_size, flags,
 438                                suppgid, request);
 439}
 440
 441int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
 442                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
 443                 const char *input, int input_size, int output_size,
 444                 int flags, struct ptlrpc_request **request)
 445{
 446        return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
 447                                fid, oc, MDS_GETXATTR, valid, xattr_name,
 448                                input, input_size, output_size, flags,
 449                                -1, request);
 450}
 451
 452#ifdef CONFIG_FS_POSIX_ACL
 453static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
 454{
 455        struct req_capsule     *pill = &req->rq_pill;
 456        struct mdt_body *body = md->body;
 457        struct posix_acl       *acl;
 458        void               *buf;
 459        int                  rc;
 460        ENTRY;
 461
 462        if (!body->aclsize)
 463                RETURN(0);
 464
 465        buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize);
 466
 467        if (!buf)
 468                RETURN(-EPROTO);
 469
 470        acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize);
 471        if (IS_ERR(acl)) {
 472                rc = PTR_ERR(acl);
 473                CERROR("convert xattr to acl: %d\n", rc);
 474                RETURN(rc);
 475        }
 476
 477        rc = posix_acl_valid(acl);
 478        if (rc) {
 479                CERROR("validate acl: %d\n", rc);
 480                posix_acl_release(acl);
 481                RETURN(rc);
 482        }
 483
 484        md->posix_acl = acl;
 485        RETURN(0);
 486}
 487#else
 488#define mdc_unpack_acl(req, md) 0
 489#endif
 490
 491int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
 492                      struct obd_export *dt_exp, struct obd_export *md_exp,
 493                      struct lustre_md *md)
 494{
 495        struct req_capsule *pill = &req->rq_pill;
 496        int rc;
 497        ENTRY;
 498
 499        LASSERT(md);
 500        memset(md, 0, sizeof(*md));
 501
 502        md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 503        LASSERT(md->body != NULL);
 504
 505        if (md->body->valid & OBD_MD_FLEASIZE) {
 506                int lmmsize;
 507                struct lov_mds_md *lmm;
 508
 509                if (!S_ISREG(md->body->mode)) {
 510                        CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a "
 511                               "regular file, but is not\n");
 512                        GOTO(out, rc = -EPROTO);
 513                }
 514
 515                if (md->body->eadatasize == 0) {
 516                        CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, "
 517                               "but eadatasize 0\n");
 518                        GOTO(out, rc = -EPROTO);
 519                }
 520                lmmsize = md->body->eadatasize;
 521                lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
 522                if (!lmm)
 523                        GOTO(out, rc = -EPROTO);
 524
 525                rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
 526                if (rc < 0)
 527                        GOTO(out, rc);
 528
 529                if (rc < sizeof(*md->lsm)) {
 530                        CDEBUG(D_INFO, "lsm size too small: "
 531                               "rc < sizeof (*md->lsm) (%d < %d)\n",
 532                               rc, (int)sizeof(*md->lsm));
 533                        GOTO(out, rc = -EPROTO);
 534                }
 535
 536        } else if (md->body->valid & OBD_MD_FLDIREA) {
 537                int lmvsize;
 538                struct lov_mds_md *lmv;
 539
 540                if(!S_ISDIR(md->body->mode)) {
 541                        CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a "
 542                               "directory, but is not\n");
 543                        GOTO(out, rc = -EPROTO);
 544                }
 545
 546                if (md->body->eadatasize == 0) {
 547                        CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, "
 548                               "but eadatasize 0\n");
 549                        RETURN(-EPROTO);
 550                }
 551                if (md->body->valid & OBD_MD_MEA) {
 552                        lmvsize = md->body->eadatasize;
 553                        lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 554                                                           lmvsize);
 555                        if (!lmv)
 556                                GOTO(out, rc = -EPROTO);
 557
 558                        rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
 559                                          lmvsize);
 560                        if (rc < 0)
 561                                GOTO(out, rc);
 562
 563                        if (rc < sizeof(*md->mea)) {
 564                                CDEBUG(D_INFO, "size too small:  "
 565                                       "rc < sizeof(*md->mea) (%d < %d)\n",
 566                                        rc, (int)sizeof(*md->mea));
 567                                GOTO(out, rc = -EPROTO);
 568                        }
 569                }
 570        }
 571        rc = 0;
 572
 573        if (md->body->valid & OBD_MD_FLRMTPERM) {
 574                /* remote permission */
 575                LASSERT(client_is_remote(exp));
 576                md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL,
 577                                                lustre_swab_mdt_remote_perm);
 578                if (!md->remote_perm)
 579                        GOTO(out, rc = -EPROTO);
 580        }
 581        else if (md->body->valid & OBD_MD_FLACL) {
 582                /* for ACL, it's possible that FLACL is set but aclsize is zero.
 583                 * only when aclsize != 0 there's an actual segment for ACL
 584                 * in reply buffer.
 585                 */
 586                if (md->body->aclsize) {
 587                        rc = mdc_unpack_acl(req, md);
 588                        if (rc)
 589                                GOTO(out, rc);
 590#ifdef CONFIG_FS_POSIX_ACL
 591                } else {
 592                        md->posix_acl = NULL;
 593#endif
 594                }
 595        }
 596        if (md->body->valid & OBD_MD_FLMDSCAPA) {
 597                struct obd_capa *oc = NULL;
 598
 599                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc);
 600                if (rc)
 601                        GOTO(out, rc);
 602                md->mds_capa = oc;
 603        }
 604
 605        if (md->body->valid & OBD_MD_FLOSSCAPA) {
 606                struct obd_capa *oc = NULL;
 607
 608                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc);
 609                if (rc)
 610                        GOTO(out, rc);
 611                md->oss_capa = oc;
 612        }
 613
 614        EXIT;
 615out:
 616        if (rc) {
 617                if (md->oss_capa) {
 618                        capa_put(md->oss_capa);
 619                        md->oss_capa = NULL;
 620                }
 621                if (md->mds_capa) {
 622                        capa_put(md->mds_capa);
 623                        md->mds_capa = NULL;
 624                }
 625#ifdef CONFIG_FS_POSIX_ACL
 626                posix_acl_release(md->posix_acl);
 627#endif
 628                if (md->lsm)
 629                        obd_free_memmd(dt_exp, &md->lsm);
 630        }
 631        return rc;
 632}
 633
 634int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 635{
 636        ENTRY;
 637        RETURN(0);
 638}
 639
 640/**
 641 * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
 642 * RPC chains.
 643 */
 644void mdc_replay_open(struct ptlrpc_request *req)
 645{
 646        struct md_open_data *mod = req->rq_cb_data;
 647        struct ptlrpc_request *close_req;
 648        struct obd_client_handle *och;
 649        struct lustre_handle old;
 650        struct mdt_body *body;
 651        ENTRY;
 652
 653        if (mod == NULL) {
 654                DEBUG_REQ(D_ERROR, req,
 655                          "Can't properly replay without open data.");
 656                EXIT;
 657                return;
 658        }
 659
 660        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 661        LASSERT(body != NULL);
 662
 663        och = mod->mod_och;
 664        if (och != NULL) {
 665                struct lustre_handle *file_fh;
 666
 667                LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
 668
 669                file_fh = &och->och_fh;
 670                CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
 671                       file_fh->cookie, body->handle.cookie);
 672                old = *file_fh;
 673                *file_fh = body->handle;
 674        }
 675        close_req = mod->mod_close_req;
 676        if (close_req != NULL) {
 677                __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
 678                struct mdt_ioepoch *epoch;
 679
 680                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
 681                epoch = req_capsule_client_get(&close_req->rq_pill,
 682                                               &RMF_MDT_EPOCH);
 683                LASSERT(epoch);
 684
 685                if (och != NULL)
 686                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
 687                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
 688                epoch->handle = body->handle;
 689        }
 690        EXIT;
 691}
 692
 693void mdc_commit_open(struct ptlrpc_request *req)
 694{
 695        struct md_open_data *mod = req->rq_cb_data;
 696        if (mod == NULL)
 697                return;
 698
 699        /**
 700         * No need to touch md_open_data::mod_och, it holds a reference on
 701         * \var mod and will zero references to each other, \var mod will be
 702         * freed after that when md_open_data::mod_och will put the reference.
 703         */
 704
 705        /**
 706         * Do not let open request to disappear as it still may be needed
 707         * for close rpc to happen (it may happen on evict only, otherwise
 708         * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
 709         * called), just mark this rpc as committed to distinguish these 2
 710         * cases, see mdc_close() for details. The open request reference will
 711         * be put along with freeing \var mod.
 712         */
 713        ptlrpc_request_addref(req);
 714        spin_lock(&req->rq_lock);
 715        req->rq_committed = 1;
 716        spin_unlock(&req->rq_lock);
 717        req->rq_cb_data = NULL;
 718        obd_mod_put(mod);
 719}
 720
 721int mdc_set_open_replay_data(struct obd_export *exp,
 722                             struct obd_client_handle *och,
 723                             struct ptlrpc_request *open_req)
 724{
 725        struct md_open_data   *mod;
 726        struct mdt_rec_create *rec;
 727        struct mdt_body       *body;
 728        struct obd_import     *imp = open_req->rq_import;
 729        ENTRY;
 730
 731        if (!open_req->rq_replay)
 732                RETURN(0);
 733
 734        rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
 735        body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
 736        LASSERT(rec != NULL);
 737        /* Incoming message in my byte order (it's been swabbed). */
 738        /* Outgoing messages always in my byte order. */
 739        LASSERT(body != NULL);
 740
 741        /* Only if the import is replayable, we set replay_open data */
 742        if (och && imp->imp_replayable) {
 743                mod = obd_mod_alloc();
 744                if (mod == NULL) {
 745                        DEBUG_REQ(D_ERROR, open_req,
 746                                  "Can't allocate md_open_data");
 747                        RETURN(0);
 748                }
 749
 750                /**
 751                 * Take a reference on \var mod, to be freed on mdc_close().
 752                 * It protects \var mod from being freed on eviction (commit
 753                 * callback is called despite rq_replay flag).
 754                 * Another reference for \var och.
 755                 */
 756                obd_mod_get(mod);
 757                obd_mod_get(mod);
 758
 759                spin_lock(&open_req->rq_lock);
 760                och->och_mod = mod;
 761                mod->mod_och = och;
 762                mod->mod_open_req = open_req;
 763                open_req->rq_cb_data = mod;
 764                open_req->rq_commit_cb = mdc_commit_open;
 765                spin_unlock(&open_req->rq_lock);
 766        }
 767
 768        rec->cr_fid2 = body->fid1;
 769        rec->cr_ioepoch = body->ioepoch;
 770        rec->cr_old_handle.cookie = body->handle.cookie;
 771        open_req->rq_replay_cb = mdc_replay_open;
 772        if (!fid_is_sane(&body->fid1)) {
 773                DEBUG_REQ(D_ERROR, open_req, "Saving replay request with "
 774                          "insane fid");
 775                LBUG();
 776        }
 777
 778        DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
 779        RETURN(0);
 780}
 781
 782int mdc_clear_open_replay_data(struct obd_export *exp,
 783                               struct obd_client_handle *och)
 784{
 785        struct md_open_data *mod = och->och_mod;
 786        ENTRY;
 787
 788        /**
 789         * It is possible to not have \var mod in a case of eviction between
 790         * lookup and ll_file_open().
 791         **/
 792        if (mod == NULL)
 793                RETURN(0);
 794
 795        LASSERT(mod != LP_POISON);
 796
 797        mod->mod_och = NULL;
 798        och->och_mod = NULL;
 799        obd_mod_put(mod);
 800
 801        RETURN(0);
 802}
 803
 804/* Prepares the request for the replay by the given reply */
 805static void mdc_close_handle_reply(struct ptlrpc_request *req,
 806                                   struct md_op_data *op_data, int rc) {
 807        struct mdt_body  *repbody;
 808        struct mdt_ioepoch *epoch;
 809
 810        if (req && rc == -EAGAIN) {
 811                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 812                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
 813
 814                epoch->flags |= MF_SOM_AU;
 815                if (repbody->valid & OBD_MD_FLGETATTRLOCK)
 816                        op_data->op_flags |= MF_GETATTR_LOCK;
 817        }
 818}
 819
 820int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 821              struct md_open_data *mod, struct ptlrpc_request **request)
 822{
 823        struct obd_device     *obd = class_exp2obd(exp);
 824        struct ptlrpc_request *req;
 825        int                 rc;
 826        ENTRY;
 827
 828        *request = NULL;
 829        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE);
 830        if (req == NULL)
 831                RETURN(-ENOMEM);
 832
 833        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 834
 835        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
 836        if (rc) {
 837                ptlrpc_request_free(req);
 838                RETURN(rc);
 839        }
 840
 841        /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
 842         * portal whose threads are not taking any DLM locks and are therefore
 843         * always progressing */
 844        req->rq_request_portal = MDS_READPAGE_PORTAL;
 845        ptlrpc_at_set_req_timeout(req);
 846
 847        /* Ensure that this close's handle is fixed up during replay. */
 848        if (likely(mod != NULL)) {
 849                LASSERTF(mod->mod_open_req != NULL &&
 850                         mod->mod_open_req->rq_type != LI_POISON,
 851                         "POISONED open %p!\n", mod->mod_open_req);
 852
 853                mod->mod_close_req = req;
 854
 855                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
 856                /* We no longer want to preserve this open for replay even
 857                 * though the open was committed. b=3632, b=3633 */
 858                spin_lock(&mod->mod_open_req->rq_lock);
 859                mod->mod_open_req->rq_replay = 0;
 860                spin_unlock(&mod->mod_open_req->rq_lock);
 861        } else {
 862                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
 863        }
 864
 865        mdc_close_pack(req, op_data);
 866
 867        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 868                             obd->u.cli.cl_max_mds_easize);
 869        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
 870                             obd->u.cli.cl_max_mds_cookiesize);
 871
 872        ptlrpc_request_set_replen(req);
 873
 874        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 875        rc = ptlrpc_queue_wait(req);
 876        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 877
 878        if (req->rq_repmsg == NULL) {
 879                CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
 880                       req->rq_status);
 881                if (rc == 0)
 882                        rc = req->rq_status ?: -EIO;
 883        } else if (rc == 0 || rc == -EAGAIN) {
 884                struct mdt_body *body;
 885
 886                rc = lustre_msg_get_status(req->rq_repmsg);
 887                if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
 888                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
 889                                  "= %d", rc);
 890                        if (rc > 0)
 891                                rc = -rc;
 892                }
 893                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 894                if (body == NULL)
 895                        rc = -EPROTO;
 896        } else if (rc == -ESTALE) {
 897                /**
 898                 * it can be allowed error after 3633 if open was committed and
 899                 * server failed before close was sent. Let's check if mod
 900                 * exists and return no error in that case
 901                 */
 902                if (mod) {
 903                        DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
 904                        LASSERT(mod->mod_open_req != NULL);
 905                        if (mod->mod_open_req->rq_committed)
 906                                rc = 0;
 907                }
 908        }
 909
 910        if (mod) {
 911                if (rc != 0)
 912                        mod->mod_close_req = NULL;
 913                /* Since now, mod is accessed through open_req only,
 914                 * thus close req does not keep a reference on mod anymore. */
 915                obd_mod_put(mod);
 916        }
 917        *request = req;
 918        mdc_close_handle_reply(req, op_data, rc);
 919        RETURN(rc);
 920}
 921
 922int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
 923                     struct md_open_data *mod)
 924{
 925        struct obd_device     *obd = class_exp2obd(exp);
 926        struct ptlrpc_request *req;
 927        int                 rc;
 928        ENTRY;
 929
 930        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 931                                   &RQF_MDS_DONE_WRITING);
 932        if (req == NULL)
 933                RETURN(-ENOMEM);
 934
 935        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 936        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
 937        if (rc) {
 938                ptlrpc_request_free(req);
 939                RETURN(rc);
 940        }
 941
 942        if (mod != NULL) {
 943                LASSERTF(mod->mod_open_req != NULL &&
 944                         mod->mod_open_req->rq_type != LI_POISON,
 945                         "POISONED setattr %p!\n", mod->mod_open_req);
 946
 947                mod->mod_close_req = req;
 948                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
 949                /* We no longer want to preserve this setattr for replay even
 950                 * though the open was committed. b=3632, b=3633 */
 951                spin_lock(&mod->mod_open_req->rq_lock);
 952                mod->mod_open_req->rq_replay = 0;
 953                spin_unlock(&mod->mod_open_req->rq_lock);
 954        }
 955
 956        mdc_close_pack(req, op_data);
 957        ptlrpc_request_set_replen(req);
 958
 959        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 960        rc = ptlrpc_queue_wait(req);
 961        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 962
 963        if (rc == -ESTALE) {
 964                /**
 965                 * it can be allowed error after 3633 if open or setattr were
 966                 * committed and server failed before close was sent.
 967                 * Let's check if mod exists and return no error in that case
 968                 */
 969                if (mod) {
 970                        LASSERT(mod->mod_open_req != NULL);
 971                        if (mod->mod_open_req->rq_committed)
 972                                rc = 0;
 973                }
 974        }
 975
 976        if (mod) {
 977                if (rc != 0)
 978                        mod->mod_close_req = NULL;
 979                /* Since now, mod is accessed through setattr req only,
 980                 * thus DW req does not keep a reference on mod anymore. */
 981                obd_mod_put(mod);
 982        }
 983
 984        mdc_close_handle_reply(req, op_data, rc);
 985        ptlrpc_req_finished(req);
 986        RETURN(rc);
 987}
 988
 989
 990int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data,
 991                 struct page **pages, struct ptlrpc_request **request)
 992{
 993        struct ptlrpc_request   *req;
 994        struct ptlrpc_bulk_desc *desc;
 995        int                   i;
 996        wait_queue_head_t             waitq;
 997        int                   resends = 0;
 998        struct l_wait_info       lwi;
 999        int                   rc;
1000        ENTRY;
1001
1002        *request = NULL;
1003        init_waitqueue_head(&waitq);
1004
1005restart_bulk:
1006        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
1007        if (req == NULL)
1008                RETURN(-ENOMEM);
1009
1010        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1011
1012        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
1013        if (rc) {
1014                ptlrpc_request_free(req);
1015                RETURN(rc);
1016        }
1017
1018        req->rq_request_portal = MDS_READPAGE_PORTAL;
1019        ptlrpc_at_set_req_timeout(req);
1020
1021        desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
1022                                    MDS_BULK_PORTAL);
1023        if (desc == NULL) {
1024                ptlrpc_request_free(req);
1025                RETURN(-ENOMEM);
1026        }
1027
1028        /* NB req now owns desc and will free it when it gets freed */
1029        for (i = 0; i < op_data->op_npages; i++)
1030                ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1031
1032        mdc_readdir_pack(req, op_data->op_offset,
1033                         PAGE_CACHE_SIZE * op_data->op_npages,
1034                         &op_data->op_fid1, op_data->op_capa1);
1035
1036        ptlrpc_request_set_replen(req);
1037        rc = ptlrpc_queue_wait(req);
1038        if (rc) {
1039                ptlrpc_req_finished(req);
1040                if (rc != -ETIMEDOUT)
1041                        RETURN(rc);
1042
1043                resends++;
1044                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1045                        CERROR("too many resend retries, returning error\n");
1046                        RETURN(-EIO);
1047                }
1048                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1049                l_wait_event(waitq, 0, &lwi);
1050
1051                goto restart_bulk;
1052        }
1053
1054        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1055                                          req->rq_bulk->bd_nob_transferred);
1056        if (rc < 0) {
1057                ptlrpc_req_finished(req);
1058                RETURN(rc);
1059        }
1060
1061        if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1062                CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
1063                        req->rq_bulk->bd_nob_transferred,
1064                        PAGE_CACHE_SIZE * op_data->op_npages);
1065                ptlrpc_req_finished(req);
1066                RETURN(-EPROTO);
1067        }
1068
1069        *request = req;
1070        RETURN(0);
1071}
1072
1073static int mdc_statfs(const struct lu_env *env,
1074                      struct obd_export *exp, struct obd_statfs *osfs,
1075                      __u64 max_age, __u32 flags)
1076{
1077        struct obd_device     *obd = class_exp2obd(exp);
1078        struct ptlrpc_request *req;
1079        struct obd_statfs     *msfs;
1080        struct obd_import     *imp = NULL;
1081        int                 rc;
1082        ENTRY;
1083
1084        /*
1085         * Since the request might also come from lprocfs, so we need
1086         * sync this with client_disconnect_export Bug15684
1087         */
1088        down_read(&obd->u.cli.cl_sem);
1089        if (obd->u.cli.cl_import)
1090                imp = class_import_get(obd->u.cli.cl_import);
1091        up_read(&obd->u.cli.cl_sem);
1092        if (!imp)
1093                RETURN(-ENODEV);
1094
1095        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1096                                        LUSTRE_MDS_VERSION, MDS_STATFS);
1097        if (req == NULL)
1098                GOTO(output, rc = -ENOMEM);
1099
1100        ptlrpc_request_set_replen(req);
1101
1102        if (flags & OBD_STATFS_NODELAY) {
1103                /* procfs requests not want stay in wait for avoid deadlock */
1104                req->rq_no_resend = 1;
1105                req->rq_no_delay = 1;
1106        }
1107
1108        rc = ptlrpc_queue_wait(req);
1109        if (rc) {
1110                /* check connection error first */
1111                if (imp->imp_connect_error)
1112                        rc = imp->imp_connect_error;
1113                GOTO(out, rc);
1114        }
1115
1116        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1117        if (msfs == NULL)
1118                GOTO(out, rc = -EPROTO);
1119
1120        *osfs = *msfs;
1121        EXIT;
1122out:
1123        ptlrpc_req_finished(req);
1124output:
1125        class_import_put(imp);
1126        return rc;
1127}
1128
1129static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1130{
1131        __u32 keylen, vallen;
1132        void *key;
1133        int rc;
1134
1135        if (gf->gf_pathlen > PATH_MAX)
1136                RETURN(-ENAMETOOLONG);
1137        if (gf->gf_pathlen < 2)
1138                RETURN(-EOVERFLOW);
1139
1140        /* Key is KEY_FID2PATH + getinfo_fid2path description */
1141        keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1142        OBD_ALLOC(key, keylen);
1143        if (key == NULL)
1144                RETURN(-ENOMEM);
1145        memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1146        memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1147
1148        CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
1149               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1150
1151        if (!fid_is_sane(&gf->gf_fid))
1152                GOTO(out, rc = -EINVAL);
1153
1154        /* Val is struct getinfo_fid2path result plus path */
1155        vallen = sizeof(*gf) + gf->gf_pathlen;
1156
1157        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
1158        if (rc != 0 && rc != -EREMOTE)
1159                GOTO(out, rc);
1160
1161        if (vallen <= sizeof(*gf))
1162                GOTO(out, rc = -EPROTO);
1163        else if (vallen > sizeof(*gf) + gf->gf_pathlen)
1164                GOTO(out, rc = -EOVERFLOW);
1165
1166        CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n%s\n",
1167               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1168
1169out:
1170        OBD_FREE(key, keylen);
1171        return rc;
1172}
1173
1174static int mdc_ioc_hsm_progress(struct obd_export *exp,
1175                                struct hsm_progress_kernel *hpk)
1176{
1177        struct obd_import               *imp = class_exp2cliimp(exp);
1178        struct hsm_progress_kernel      *req_hpk;
1179        struct ptlrpc_request           *req;
1180        int                              rc;
1181        ENTRY;
1182
1183        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1184                                        LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1185        if (req == NULL)
1186                GOTO(out, rc = -ENOMEM);
1187
1188        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1189
1190        /* Copy hsm_progress struct */
1191        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1192        if (req_hpk == NULL)
1193                GOTO(out, rc = -EPROTO);
1194
1195        *req_hpk = *hpk;
1196
1197        ptlrpc_request_set_replen(req);
1198
1199        rc = mdc_queue_wait(req);
1200        GOTO(out, rc);
1201out:
1202        ptlrpc_req_finished(req);
1203        return rc;
1204}
1205
1206static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
1207{
1208        __u32                   *archive_mask;
1209        struct ptlrpc_request   *req;
1210        int                      rc;
1211        ENTRY;
1212
1213        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER,
1214                                        LUSTRE_MDS_VERSION,
1215                                        MDS_HSM_CT_REGISTER);
1216        if (req == NULL)
1217                GOTO(out, rc = -ENOMEM);
1218
1219        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1220
1221        /* Copy hsm_progress struct */
1222        archive_mask = req_capsule_client_get(&req->rq_pill,
1223                                              &RMF_MDS_HSM_ARCHIVE);
1224        if (archive_mask == NULL)
1225                GOTO(out, rc = -EPROTO);
1226
1227        *archive_mask = archives;
1228
1229        ptlrpc_request_set_replen(req);
1230
1231        rc = mdc_queue_wait(req);
1232        GOTO(out, rc);
1233out:
1234        ptlrpc_req_finished(req);
1235        return rc;
1236}
1237
1238static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1239                                      struct md_op_data *op_data)
1240{
1241        struct hsm_current_action       *hca = op_data->op_data;
1242        struct hsm_current_action       *req_hca;
1243        struct ptlrpc_request           *req;
1244        int                              rc;
1245        ENTRY;
1246
1247        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1248                                   &RQF_MDS_HSM_ACTION);
1249        if (req == NULL)
1250                RETURN(-ENOMEM);
1251
1252        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1253
1254        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1255        if (rc) {
1256                ptlrpc_request_free(req);
1257                RETURN(rc);
1258        }
1259
1260        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1261                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1262
1263        ptlrpc_request_set_replen(req);
1264
1265        rc = mdc_queue_wait(req);
1266        if (rc)
1267                GOTO(out, rc);
1268
1269        req_hca = req_capsule_server_get(&req->rq_pill,
1270                                         &RMF_MDS_HSM_CURRENT_ACTION);
1271        if (req_hca == NULL)
1272                GOTO(out, rc = -EPROTO);
1273
1274        *hca = *req_hca;
1275
1276        EXIT;
1277out:
1278        ptlrpc_req_finished(req);
1279        return rc;
1280}
1281
1282static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1283{
1284        struct ptlrpc_request   *req;
1285        int                      rc;
1286        ENTRY;
1287
1288        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1289                                        LUSTRE_MDS_VERSION,
1290                                        MDS_HSM_CT_UNREGISTER);
1291        if (req == NULL)
1292                GOTO(out, rc = -ENOMEM);
1293
1294        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1295
1296        ptlrpc_request_set_replen(req);
1297
1298        rc = mdc_queue_wait(req);
1299        GOTO(out, rc);
1300out:
1301        ptlrpc_req_finished(req);
1302        return rc;
1303}
1304
1305static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1306                                 struct md_op_data *op_data)
1307{
1308        struct hsm_user_state   *hus = op_data->op_data;
1309        struct hsm_user_state   *req_hus;
1310        struct ptlrpc_request   *req;
1311        int                      rc;
1312        ENTRY;
1313
1314        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1315                                   &RQF_MDS_HSM_STATE_GET);
1316        if (req == NULL)
1317                RETURN(-ENOMEM);
1318
1319        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1320
1321        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1322        if (rc != 0) {
1323                ptlrpc_request_free(req);
1324                RETURN(rc);
1325        }
1326
1327        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1328                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1329
1330        ptlrpc_request_set_replen(req);
1331
1332        rc = mdc_queue_wait(req);
1333        if (rc)
1334                GOTO(out, rc);
1335
1336        req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1337        if (req_hus == NULL)
1338                GOTO(out, rc = -EPROTO);
1339
1340        *hus = *req_hus;
1341
1342        EXIT;
1343out:
1344        ptlrpc_req_finished(req);
1345        return rc;
1346}
1347
1348static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1349                                 struct md_op_data *op_data)
1350{
1351        struct hsm_state_set    *hss = op_data->op_data;
1352        struct hsm_state_set    *req_hss;
1353        struct ptlrpc_request   *req;
1354        int                      rc;
1355        ENTRY;
1356
1357        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1358                                   &RQF_MDS_HSM_STATE_SET);
1359        if (req == NULL)
1360                RETURN(-ENOMEM);
1361
1362        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1363
1364        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1365        if (rc) {
1366                ptlrpc_request_free(req);
1367                RETURN(rc);
1368        }
1369
1370        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1371                      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1372
1373        /* Copy states */
1374        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1375        if (req_hss == NULL)
1376                GOTO(out, rc = -EPROTO);
1377        *req_hss = *hss;
1378
1379        ptlrpc_request_set_replen(req);
1380
1381        rc = mdc_queue_wait(req);
1382        GOTO(out, rc);
1383
1384        EXIT;
1385out:
1386        ptlrpc_req_finished(req);
1387        return rc;
1388}
1389
1390static int mdc_ioc_hsm_request(struct obd_export *exp,
1391                               struct hsm_user_request *hur)
1392{
1393        struct obd_import       *imp = class_exp2cliimp(exp);
1394        struct ptlrpc_request   *req;
1395        struct hsm_request      *req_hr;
1396        struct hsm_user_item    *req_hui;
1397        char                    *req_opaque;
1398        int                      rc;
1399        ENTRY;
1400
1401        req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
1402        if (req == NULL)
1403                GOTO(out, rc = -ENOMEM);
1404
1405        req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
1406                             hur->hur_request.hr_itemcount
1407                             * sizeof(struct hsm_user_item));
1408        req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
1409                             hur->hur_request.hr_data_len);
1410
1411        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
1412        if (rc) {
1413                ptlrpc_request_free(req);
1414                RETURN(rc);
1415        }
1416
1417        mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1418
1419        /* Copy hsm_request struct */
1420        req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
1421        if (req_hr == NULL)
1422                GOTO(out, rc = -EPROTO);
1423        *req_hr = hur->hur_request;
1424
1425        /* Copy hsm_user_item structs */
1426        req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
1427        if (req_hui == NULL)
1428                GOTO(out, rc = -EPROTO);
1429        memcpy(req_hui, hur->hur_user_item,
1430               hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
1431
1432        /* Copy opaque field */
1433        req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
1434        if (req_opaque == NULL)
1435                GOTO(out, rc = -EPROTO);
1436        memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
1437
1438        ptlrpc_request_set_replen(req);
1439
1440        rc = mdc_queue_wait(req);
1441        GOTO(out, rc);
1442
1443out:
1444        ptlrpc_req_finished(req);
1445        return rc;
1446}
1447
1448static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
1449{
1450        struct kuc_hdr *lh = (struct kuc_hdr *)buf;
1451
1452        LASSERT(len <= CR_MAXSIZE);
1453
1454        lh->kuc_magic = KUC_MAGIC;
1455        lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
1456        lh->kuc_flags = flags;
1457        lh->kuc_msgtype = CL_RECORD;
1458        lh->kuc_msglen = len;
1459        return lh;
1460}
1461
1462#define D_CHANGELOG 0
1463
1464struct changelog_show {
1465        __u64           cs_startrec;
1466        __u32           cs_flags;
1467        struct file     *cs_fp;
1468        char            *cs_buf;
1469        struct obd_device *cs_obd;
1470};
1471
1472static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
1473                             struct llog_rec_hdr *hdr, void *data)
1474{
1475        struct changelog_show *cs = data;
1476        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
1477        struct kuc_hdr *lh;
1478        int len, rc;
1479        ENTRY;
1480
1481        if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
1482                rc = -EINVAL;
1483                CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
1484                       cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
1485                       rec->cr.cr_type, rc);
1486                RETURN(rc);
1487        }
1488
1489        if (rec->cr.cr_index < cs->cs_startrec) {
1490                /* Skip entries earlier than what we are interested in */
1491                CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
1492                       rec->cr.cr_index, cs->cs_startrec);
1493                RETURN(0);
1494        }
1495
1496        CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
1497                " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
1498                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
1499                rec->cr.cr_flags & CLF_FLAGMASK,
1500                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
1501                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
1502
1503        len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
1504
1505        /* Set up the message */
1506        lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
1507        memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
1508
1509        rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
1510        CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc);
1511
1512        RETURN(rc);
1513}
1514
1515static int mdc_changelog_send_thread(void *csdata)
1516{
1517        struct changelog_show *cs = csdata;
1518        struct llog_ctxt *ctxt = NULL;
1519        struct llog_handle *llh = NULL;
1520        struct kuc_hdr *kuch;
1521        int rc;
1522
1523        CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n",
1524               cs->cs_fp, cs->cs_startrec);
1525
1526        OBD_ALLOC(cs->cs_buf, CR_MAXSIZE);
1527        if (cs->cs_buf == NULL)
1528                GOTO(out, rc = -ENOMEM);
1529
1530        /* Set up the remote catalog handle */
1531        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
1532        if (ctxt == NULL)
1533                GOTO(out, rc = -ENOENT);
1534        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
1535                       LLOG_OPEN_EXISTS);
1536        if (rc) {
1537                CERROR("%s: fail to open changelog catalog: rc = %d\n",
1538                       cs->cs_obd->obd_name, rc);
1539                GOTO(out, rc);
1540        }
1541        rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
1542        if (rc) {
1543                CERROR("llog_init_handle failed %d\n", rc);
1544                GOTO(out, rc);
1545        }
1546
1547        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
1548
1549        /* Send EOF no matter what our result */
1550        if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
1551                                      cs->cs_flags))) {
1552                kuch->kuc_msgtype = CL_EOF;
1553                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
1554        }
1555
1556out:
1557        fput(cs->cs_fp);
1558        if (llh)
1559                llog_cat_close(NULL, llh);
1560        if (ctxt)
1561                llog_ctxt_put(ctxt);
1562        if (cs->cs_buf)
1563                OBD_FREE(cs->cs_buf, CR_MAXSIZE);
1564        OBD_FREE_PTR(cs);
1565        return rc;
1566}
1567
1568static int mdc_ioc_changelog_send(struct obd_device *obd,
1569                                  struct ioc_changelog *icc)
1570{
1571        struct changelog_show *cs;
1572        int rc;
1573
1574        /* Freed in mdc_changelog_send_thread */
1575        OBD_ALLOC_PTR(cs);
1576        if (!cs)
1577                return -ENOMEM;
1578
1579        cs->cs_obd = obd;
1580        cs->cs_startrec = icc->icc_recno;
1581        /* matching fput in mdc_changelog_send_thread */
1582        cs->cs_fp = fget(icc->icc_id);
1583        cs->cs_flags = icc->icc_flags;
1584
1585        /*
1586         * New thread because we should return to user app before
1587         * writing into our pipe
1588         */
1589        rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs,
1590                                 "mdc_clg_send_thread"));
1591        if (!IS_ERR_VALUE(rc)) {
1592                CDEBUG(D_CHANGELOG, "start changelog thread\n");
1593                return 0;
1594        }
1595
1596        CERROR("Failed to start changelog thread: %d\n", rc);
1597        OBD_FREE_PTR(cs);
1598        return rc;
1599}
1600
1601static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1602                                struct lustre_kernelcomm *lk);
1603
1604static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
1605                          struct obd_quotactl *oqctl)
1606{
1607        struct client_obd       *cli = &exp->exp_obd->u.cli;
1608        struct ptlrpc_request   *req;
1609        struct obd_quotactl     *body;
1610        int                   rc;
1611        ENTRY;
1612
1613        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1614                                        &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
1615                                        MDS_QUOTACHECK);
1616        if (req == NULL)
1617                RETURN(-ENOMEM);
1618
1619        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1620        *body = *oqctl;
1621
1622        ptlrpc_request_set_replen(req);
1623
1624        /* the next poll will find -ENODATA, that means quotacheck is
1625         * going on */
1626        cli->cl_qchk_stat = -ENODATA;
1627        rc = ptlrpc_queue_wait(req);
1628        if (rc)
1629                cli->cl_qchk_stat = rc;
1630        ptlrpc_req_finished(req);
1631        RETURN(rc);
1632}
1633
1634static int mdc_quota_poll_check(struct obd_export *exp,
1635                                struct if_quotacheck *qchk)
1636{
1637        struct client_obd *cli = &exp->exp_obd->u.cli;
1638        int rc;
1639        ENTRY;
1640
1641        qchk->obd_uuid = cli->cl_target_uuid;
1642        memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
1643
1644        rc = cli->cl_qchk_stat;
1645        /* the client is not the previous one */
1646        if (rc == CL_NOT_QUOTACHECKED)
1647                rc = -EINTR;
1648        RETURN(rc);
1649}
1650
1651static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
1652                        struct obd_quotactl *oqctl)
1653{
1654        struct ptlrpc_request   *req;
1655        struct obd_quotactl     *oqc;
1656        int                   rc;
1657        ENTRY;
1658
1659        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1660                                        &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
1661                                        MDS_QUOTACTL);
1662        if (req == NULL)
1663                RETURN(-ENOMEM);
1664
1665        oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1666        *oqc = *oqctl;
1667
1668        ptlrpc_request_set_replen(req);
1669        ptlrpc_at_set_req_timeout(req);
1670        req->rq_no_resend = 1;
1671
1672        rc = ptlrpc_queue_wait(req);
1673        if (rc)
1674                CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
1675
1676        if (req->rq_repmsg &&
1677            (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
1678                *oqctl = *oqc;
1679        } else if (!rc) {
1680                CERROR ("Can't unpack obd_quotactl\n");
1681                rc = -EPROTO;
1682        }
1683        ptlrpc_req_finished(req);
1684
1685        RETURN(rc);
1686}
1687
1688static int mdc_ioc_swap_layouts(struct obd_export *exp,
1689                                struct md_op_data *op_data)
1690{
1691        LIST_HEAD(cancels);
1692        struct ptlrpc_request   *req;
1693        int                      rc, count;
1694        struct mdc_swap_layouts *msl, *payload;
1695        ENTRY;
1696
1697        msl = op_data->op_data;
1698
1699        /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
1700         * first thing it will do is to cancel the 2 layout
1701         * locks hold by this client.
1702         * So the client must cancel its layout locks on the 2 fids
1703         * with the request RPC to avoid extra RPC round trips
1704         */
1705        count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
1706                                        LCK_CR, MDS_INODELOCK_LAYOUT);
1707        count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
1708                                         LCK_CR, MDS_INODELOCK_LAYOUT);
1709
1710        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1711                                   &RQF_MDS_SWAP_LAYOUTS);
1712        if (req == NULL) {
1713                ldlm_lock_list_put(&cancels, l_bl_ast, count);
1714                RETURN(-ENOMEM);
1715        }
1716
1717        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1718        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
1719
1720        rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
1721        if (rc) {
1722                ptlrpc_request_free(req);
1723                RETURN(rc);
1724        }
1725
1726        mdc_swap_layouts_pack(req, op_data);
1727
1728        payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
1729        LASSERT(payload);
1730
1731        *payload = *msl;
1732
1733        ptlrpc_request_set_replen(req);
1734
1735        rc = ptlrpc_queue_wait(req);
1736        if (rc)
1737                GOTO(out, rc);
1738        EXIT;
1739
1740out:
1741        ptlrpc_req_finished(req);
1742        return rc;
1743}
1744
1745static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1746                         void *karg, void *uarg)
1747{
1748        struct obd_device *obd = exp->exp_obd;
1749        struct obd_ioctl_data *data = karg;
1750        struct obd_import *imp = obd->u.cli.cl_import;
1751        struct llog_ctxt *ctxt;
1752        int rc;
1753        ENTRY;
1754
1755        if (!try_module_get(THIS_MODULE)) {
1756                CERROR("Can't get module. Is it alive?");
1757                return -EINVAL;
1758        }
1759        switch (cmd) {
1760        case OBD_IOC_CHANGELOG_SEND:
1761                rc = mdc_ioc_changelog_send(obd, karg);
1762                GOTO(out, rc);
1763        case OBD_IOC_CHANGELOG_CLEAR: {
1764                struct ioc_changelog *icc = karg;
1765                struct changelog_setinfo cs =
1766                        {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id};
1767                rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
1768                                        KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
1769                                        NULL);
1770                GOTO(out, rc);
1771        }
1772        case OBD_IOC_FID2PATH:
1773                rc = mdc_ioc_fid2path(exp, karg);
1774                GOTO(out, rc);
1775        case LL_IOC_HSM_CT_START:
1776                rc = mdc_ioc_hsm_ct_start(exp, karg);
1777                GOTO(out, rc);
1778        case LL_IOC_HSM_PROGRESS:
1779                rc = mdc_ioc_hsm_progress(exp, karg);
1780                GOTO(out, rc);
1781        case LL_IOC_HSM_STATE_GET:
1782                rc = mdc_ioc_hsm_state_get(exp, karg);
1783                GOTO(out, rc);
1784        case LL_IOC_HSM_STATE_SET:
1785                rc = mdc_ioc_hsm_state_set(exp, karg);
1786        case LL_IOC_HSM_ACTION:
1787                rc = mdc_ioc_hsm_current_action(exp, karg);
1788                GOTO(out, rc);
1789        case LL_IOC_HSM_REQUEST:
1790                rc = mdc_ioc_hsm_request(exp, karg);
1791                GOTO(out, rc);
1792        case OBD_IOC_CLIENT_RECOVER:
1793                rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
1794                if (rc < 0)
1795                        GOTO(out, rc);
1796                GOTO(out, rc = 0);
1797        case IOC_OSC_SET_ACTIVE:
1798                rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1799                GOTO(out, rc);
1800        case OBD_IOC_PARSE: {
1801                ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
1802                rc = class_config_parse_llog(NULL, ctxt, data->ioc_inlbuf1,
1803                                             NULL);
1804                llog_ctxt_put(ctxt);
1805                GOTO(out, rc);
1806        }
1807        case OBD_IOC_LLOG_INFO:
1808        case OBD_IOC_LLOG_PRINT: {
1809                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1810                rc = llog_ioctl(NULL, ctxt, cmd, data);
1811                llog_ctxt_put(ctxt);
1812                GOTO(out, rc);
1813        }
1814        case OBD_IOC_POLL_QUOTACHECK:
1815                rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
1816                GOTO(out, rc);
1817        case OBD_IOC_PING_TARGET:
1818                rc = ptlrpc_obd_ping(obd);
1819                GOTO(out, rc);
1820        /*
1821         * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
1822         * LMV instead of MDC. But when the cluster is upgraded from 1.8,
1823         * there'd be no LMV layer thus we might be called here. Eventually
1824         * this code should be removed.
1825         * bz20731, LU-592.
1826         */
1827        case IOC_OBD_STATFS: {
1828                struct obd_statfs stat_buf = {0};
1829
1830                if (*((__u32 *) data->ioc_inlbuf2) != 0)
1831                        GOTO(out, rc = -ENODEV);
1832
1833                /* copy UUID */
1834                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
1835                                     min((int) data->ioc_plen2,
1836                                         (int) sizeof(struct obd_uuid))))
1837                        GOTO(out, rc = -EFAULT);
1838
1839                rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
1840                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1841                                0);
1842                if (rc != 0)
1843                        GOTO(out, rc);
1844
1845                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1846                                     min((int) data->ioc_plen1,
1847                                         (int) sizeof(stat_buf))))
1848                        GOTO(out, rc = -EFAULT);
1849
1850                GOTO(out, rc = 0);
1851        }
1852        case OBD_IOC_QUOTACTL: {
1853                struct if_quotactl *qctl = karg;
1854                struct obd_quotactl *oqctl;
1855
1856                OBD_ALLOC_PTR(oqctl);
1857                if (!oqctl)
1858                        RETURN(-ENOMEM);
1859
1860                QCTL_COPY(oqctl, qctl);
1861                rc = obd_quotactl(exp, oqctl);
1862                if (rc == 0) {
1863                        QCTL_COPY(qctl, oqctl);
1864                        qctl->qc_valid = QC_MDTIDX;
1865                        qctl->obd_uuid = obd->u.cli.cl_target_uuid;
1866                }
1867                OBD_FREE_PTR(oqctl);
1868                break;
1869        }
1870        case LL_IOC_GET_CONNECT_FLAGS: {
1871                if (copy_to_user(uarg,
1872                                     exp_connect_flags_ptr(exp),
1873                                     sizeof(__u64)))
1874                        GOTO(out, rc = -EFAULT);
1875                else
1876                        GOTO(out, rc = 0);
1877        }
1878        case LL_IOC_LOV_SWAP_LAYOUTS: {
1879                rc = mdc_ioc_swap_layouts(exp, karg);
1880                break;
1881        }
1882        default:
1883                CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
1884                GOTO(out, rc = -ENOTTY);
1885        }
1886out:
1887        module_put(THIS_MODULE);
1888
1889        return rc;
1890}
1891
1892int mdc_get_info_rpc(struct obd_export *exp,
1893                     obd_count keylen, void *key,
1894                     int vallen, void *val)
1895{
1896        struct obd_import      *imp = class_exp2cliimp(exp);
1897        struct ptlrpc_request  *req;
1898        char               *tmp;
1899        int                  rc = -EINVAL;
1900        ENTRY;
1901
1902        req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
1903        if (req == NULL)
1904                RETURN(-ENOMEM);
1905
1906        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
1907                             RCL_CLIENT, keylen);
1908        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
1909                             RCL_CLIENT, sizeof(__u32));
1910
1911        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
1912        if (rc) {
1913                ptlrpc_request_free(req);
1914                RETURN(rc);
1915        }
1916
1917        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
1918        memcpy(tmp, key, keylen);
1919        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
1920        memcpy(tmp, &vallen, sizeof(__u32));
1921
1922        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
1923                             RCL_SERVER, vallen);
1924        ptlrpc_request_set_replen(req);
1925
1926        rc = ptlrpc_queue_wait(req);
1927        /* -EREMOTE means the get_info result is partial, and it needs to
1928         * continue on another MDT, see fid2path part in lmv_iocontrol */
1929        if (rc == 0 || rc == -EREMOTE) {
1930                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
1931                memcpy(val, tmp, vallen);
1932                if (ptlrpc_rep_need_swab(req)) {
1933                        if (KEY_IS(KEY_FID2PATH))
1934                                lustre_swab_fid2path(val);
1935                }
1936        }
1937        ptlrpc_req_finished(req);
1938
1939        RETURN(rc);
1940}
1941
1942static void lustre_swab_hai(struct hsm_action_item *h)
1943{
1944        __swab32s(&h->hai_len);
1945        __swab32s(&h->hai_action);
1946        lustre_swab_lu_fid(&h->hai_fid);
1947        lustre_swab_lu_fid(&h->hai_dfid);
1948        __swab64s(&h->hai_cookie);
1949        __swab64s(&h->hai_extent.offset);
1950        __swab64s(&h->hai_extent.length);
1951        __swab64s(&h->hai_gid);
1952}
1953
1954static void lustre_swab_hal(struct hsm_action_list *h)
1955{
1956        struct hsm_action_item  *hai;
1957        int                      i;
1958
1959        __swab32s(&h->hal_version);
1960        __swab32s(&h->hal_count);
1961        __swab32s(&h->hal_archive_id);
1962        __swab64s(&h->hal_flags);
1963        hai = hai_zero(h);
1964        for (i = 0; i < h->hal_count; i++) {
1965                lustre_swab_hai(hai);
1966                hai = hai_next(hai);
1967        }
1968}
1969
1970static void lustre_swab_kuch(struct kuc_hdr *l)
1971{
1972        __swab16s(&l->kuc_magic);
1973        /* __u8 l->kuc_transport */
1974        __swab16s(&l->kuc_msgtype);
1975        __swab16s(&l->kuc_msglen);
1976}
1977
1978static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1979                                struct lustre_kernelcomm *lk)
1980{
1981        struct obd_import  *imp = class_exp2cliimp(exp);
1982        __u32               archive = lk->lk_data;
1983        int                 rc = 0;
1984
1985        if (lk->lk_group != KUC_GRP_HSM) {
1986                CERROR("Bad copytool group %d\n", lk->lk_group);
1987                return -EINVAL;
1988        }
1989
1990        CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
1991               lk->lk_uid, lk->lk_group, lk->lk_flags);
1992
1993        if (lk->lk_flags & LK_FLG_STOP) {
1994                rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
1995                /* Unregister with the coordinator */
1996                if (rc == 0)
1997                        rc = mdc_ioc_hsm_ct_unregister(imp);
1998        } else {
1999                struct file *fp = fget(lk->lk_wfd);
2000
2001                rc = libcfs_kkuc_group_add(fp, lk->lk_uid, lk->lk_group,
2002                                           lk->lk_data);
2003                if (rc && fp)
2004                        fput(fp);
2005                if (rc == 0)
2006                        rc = mdc_ioc_hsm_ct_register(imp, archive);
2007        }
2008
2009        return rc;
2010}
2011
2012/**
2013 * Send a message to any listening copytools
2014 * @param val KUC message (kuc_hdr + hsm_action_list)
2015 * @param len total length of message
2016 */
2017static int mdc_hsm_copytool_send(int len, void *val)
2018{
2019        struct kuc_hdr          *lh = (struct kuc_hdr *)val;
2020        struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
2021        int                      rc;
2022        ENTRY;
2023
2024        if (len < sizeof(*lh) + sizeof(*hal)) {
2025                CERROR("Short HSM message %d < %d\n", len,
2026                       (int) (sizeof(*lh) + sizeof(*hal)));
2027                RETURN(-EPROTO);
2028        }
2029        if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
2030                lustre_swab_kuch(lh);
2031                lustre_swab_hal(hal);
2032        } else if (lh->kuc_magic != KUC_MAGIC) {
2033                CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
2034                RETURN(-EPROTO);
2035        }
2036
2037        CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d "
2038               "on %s\n",
2039               lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2040               lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2041
2042        /* Broadcast to HSM listeners */
2043        rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh);
2044
2045        RETURN(rc);
2046}
2047
2048/**
2049 * callback function passed to kuc for re-registering each HSM copytool
2050 * running on MDC, after MDT shutdown/recovery.
2051 * @param data archive id served by the copytool
2052 * @param cb_arg callback argument (obd_import)
2053 */
2054static int mdc_hsm_ct_reregister(__u32 data, void *cb_arg)
2055{
2056        struct obd_import       *imp = (struct obd_import *)cb_arg;
2057        __u32                    archive = data;
2058        int                      rc;
2059
2060        CDEBUG(D_HA, "recover copytool registration to MDT (archive=%#x)\n",
2061               archive);
2062        rc = mdc_ioc_hsm_ct_register(imp, archive);
2063
2064        /* ignore error if the copytool is already registered */
2065        return ((rc != 0) && (rc != -EEXIST)) ? rc : 0;
2066}
2067
2068/**
2069 * Re-establish all kuc contexts with MDT
2070 * after MDT shutdown/recovery.
2071 */
2072static int mdc_kuc_reregister(struct obd_import *imp)
2073{
2074        /* re-register HSM agents */
2075        return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister,
2076                                         (void *)imp);
2077}
2078
2079int mdc_set_info_async(const struct lu_env *env,
2080                       struct obd_export *exp,
2081                       obd_count keylen, void *key,
2082                       obd_count vallen, void *val,
2083                       struct ptlrpc_request_set *set)
2084{
2085        struct obd_import       *imp = class_exp2cliimp(exp);
2086        int                      rc;
2087        ENTRY;
2088
2089        if (KEY_IS(KEY_READ_ONLY)) {
2090                if (vallen != sizeof(int))
2091                        RETURN(-EINVAL);
2092
2093                spin_lock(&imp->imp_lock);
2094                if (*((int *)val)) {
2095                        imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2096                        imp->imp_connect_data.ocd_connect_flags |=
2097                                                        OBD_CONNECT_RDONLY;
2098                } else {
2099                        imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2100                        imp->imp_connect_data.ocd_connect_flags &=
2101                                                        ~OBD_CONNECT_RDONLY;
2102                }
2103                spin_unlock(&imp->imp_lock);
2104
2105                rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2106                                       keylen, key, vallen, val, set);
2107                RETURN(rc);
2108        }
2109        if (KEY_IS(KEY_SPTLRPC_CONF)) {
2110                sptlrpc_conf_client_adapt(exp->exp_obd);
2111                RETURN(0);
2112        }
2113        if (KEY_IS(KEY_FLUSH_CTX)) {
2114                sptlrpc_import_flush_my_ctx(imp);
2115                RETURN(0);
2116        }
2117        if (KEY_IS(KEY_MDS_CONN)) {
2118                /* mds-mds import */
2119                spin_lock(&imp->imp_lock);
2120                imp->imp_server_timeout = 1;
2121                spin_unlock(&imp->imp_lock);
2122                imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
2123                CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name);
2124                RETURN(0);
2125        }
2126        if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2127                rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2128                                       keylen, key, vallen, val, set);
2129                RETURN(rc);
2130        }
2131        if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2132                rc = mdc_hsm_copytool_send(vallen, val);
2133                RETURN(rc);
2134        }
2135
2136        CERROR("Unknown key %s\n", (char *)key);
2137        RETURN(-EINVAL);
2138}
2139
2140int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2141                 __u32 keylen, void *key, __u32 *vallen, void *val,
2142                 struct lov_stripe_md *lsm)
2143{
2144        int rc = -EINVAL;
2145
2146        if (KEY_IS(KEY_MAX_EASIZE)) {
2147                int mdsize, *max_easize;
2148
2149                if (*vallen != sizeof(int))
2150                        RETURN(-EINVAL);
2151                mdsize = *(int*)val;
2152                if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2153                        exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2154                max_easize = val;
2155                *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2156                RETURN(0);
2157        } else if (KEY_IS(KEY_CONN_DATA)) {
2158                struct obd_import *imp = class_exp2cliimp(exp);
2159                struct obd_connect_data *data = val;
2160
2161                if (*vallen != sizeof(*data))
2162                        RETURN(-EINVAL);
2163
2164                *data = imp->imp_connect_data;
2165                RETURN(0);
2166        } else if (KEY_IS(KEY_TGT_COUNT)) {
2167                *((int *)val) = 1;
2168                RETURN(0);
2169        }
2170
2171        rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2172
2173        RETURN(rc);
2174}
2175
2176static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
2177                   struct obd_capa *oc, struct obd_client_handle *handle,
2178                   int flags)
2179{
2180        struct ptlrpc_request *req;
2181        struct mdt_body       *body;
2182        int                 rc;
2183        ENTRY;
2184
2185        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_PIN);
2186        if (req == NULL)
2187                RETURN(-ENOMEM);
2188
2189        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2190
2191        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_PIN);
2192        if (rc) {
2193                ptlrpc_request_free(req);
2194                RETURN(rc);
2195        }
2196
2197        mdc_pack_body(req, fid, oc, 0, 0, -1, flags);
2198
2199        ptlrpc_request_set_replen(req);
2200
2201        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2202        rc = ptlrpc_queue_wait(req);
2203        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2204        if (rc) {
2205                CERROR("Pin failed: %d\n", rc);
2206                GOTO(err_out, rc);
2207        }
2208
2209        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2210        if (body == NULL)
2211                GOTO(err_out, rc = -EPROTO);
2212
2213        handle->och_fh = body->handle;
2214        handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
2215
2216        handle->och_mod = obd_mod_alloc();
2217        if (handle->och_mod == NULL) {
2218                DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
2219                GOTO(err_out, rc = -ENOMEM);
2220        }
2221        handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
2222
2223        RETURN(0);
2224
2225err_out:
2226        ptlrpc_req_finished(req);
2227        RETURN(rc);
2228}
2229
2230static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle,
2231                     int flag)
2232{
2233        struct ptlrpc_request *req;
2234        struct mdt_body       *body;
2235        int                 rc;
2236        ENTRY;
2237
2238        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_UNPIN,
2239                                        LUSTRE_MDS_VERSION, MDS_UNPIN);
2240        if (req == NULL)
2241                RETURN(-ENOMEM);
2242
2243        body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
2244        body->handle = handle->och_fh;
2245        body->flags = flag;
2246
2247        ptlrpc_request_set_replen(req);
2248
2249        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2250        rc = ptlrpc_queue_wait(req);
2251        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
2252
2253        if (rc != 0)
2254                CERROR("Unpin failed: %d\n", rc);
2255
2256        ptlrpc_req_finished(req);
2257        ptlrpc_req_finished(handle->och_mod->mod_open_req);
2258
2259        obd_mod_put(handle->och_mod);
2260        RETURN(rc);
2261}
2262
2263int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
2264             struct obd_capa *oc, struct ptlrpc_request **request)
2265{
2266        struct ptlrpc_request *req;
2267        int                 rc;
2268        ENTRY;
2269
2270        *request = NULL;
2271        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2272        if (req == NULL)
2273                RETURN(-ENOMEM);
2274
2275        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2276
2277        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2278        if (rc) {
2279                ptlrpc_request_free(req);
2280                RETURN(rc);
2281        }
2282
2283        mdc_pack_body(req, fid, oc, 0, 0, -1, 0);
2284
2285        ptlrpc_request_set_replen(req);
2286
2287        rc = ptlrpc_queue_wait(req);
2288        if (rc)
2289                ptlrpc_req_finished(req);
2290        else
2291                *request = req;
2292        RETURN(rc);
2293}
2294
2295static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2296                            enum obd_import_event event)
2297{
2298        int rc = 0;
2299
2300        LASSERT(imp->imp_obd == obd);
2301
2302        switch (event) {
2303        case IMP_EVENT_DISCON: {
2304#if 0
2305                /* XXX Pass event up to OBDs stack. used only for FLD now */
2306                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
2307#endif
2308                break;
2309        }
2310        case IMP_EVENT_INACTIVE: {
2311                struct client_obd *cli = &obd->u.cli;
2312                /*
2313                 * Flush current sequence to make client obtain new one
2314                 * from server in case of disconnect/reconnect.
2315                 */
2316                if (cli->cl_seq != NULL)
2317                        seq_client_flush(cli->cl_seq);
2318
2319                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2320                break;
2321        }
2322        case IMP_EVENT_INVALIDATE: {
2323                struct ldlm_namespace *ns = obd->obd_namespace;
2324
2325                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2326
2327                break;
2328        }
2329        case IMP_EVENT_ACTIVE:
2330                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2331                /* restore re-establish kuc registration after reconnecting */
2332                if (rc == 0)
2333                        rc = mdc_kuc_reregister(imp);
2334                break;
2335        case IMP_EVENT_OCD:
2336                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2337                break;
2338        case IMP_EVENT_DEACTIVATE:
2339        case IMP_EVENT_ACTIVATE:
2340                break;
2341        default:
2342                CERROR("Unknown import event %x\n", event);
2343                LBUG();
2344        }
2345        RETURN(rc);
2346}
2347
2348int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
2349                  struct md_op_data *op_data)
2350{
2351        struct client_obd *cli = &exp->exp_obd->u.cli;
2352        struct lu_client_seq *seq = cli->cl_seq;
2353        ENTRY;
2354        RETURN(seq_client_alloc_fid(NULL, seq, fid));
2355}
2356
2357struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
2358        struct client_obd *cli = &exp->exp_obd->u.cli;
2359        return &cli->cl_target_uuid;
2360}
2361
2362/**
2363 * Determine whether the lock can be canceled before replaying it during
2364 * recovery, non zero value will be return if the lock can be canceled,
2365 * or zero returned for not
2366 */
2367static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
2368{
2369        if (lock->l_resource->lr_type != LDLM_IBITS)
2370                RETURN(0);
2371
2372        /* FIXME: if we ever get into a situation where there are too many
2373         * opened files with open locks on a single node, then we really
2374         * should replay these open locks to reget it */
2375        if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2376                RETURN(0);
2377
2378        RETURN(1);
2379}
2380
2381static int mdc_resource_inode_free(struct ldlm_resource *res)
2382{
2383        if (res->lr_lvb_inode)
2384                res->lr_lvb_inode = NULL;
2385
2386        return 0;
2387}
2388
2389struct ldlm_valblock_ops inode_lvbo = {
2390        lvbo_free: mdc_resource_inode_free
2391};
2392
2393static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2394{
2395        struct client_obd *cli = &obd->u.cli;
2396        struct lprocfs_static_vars lvars = { 0 };
2397        int rc;
2398        ENTRY;
2399
2400        OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2401        if (!cli->cl_rpc_lock)
2402                RETURN(-ENOMEM);
2403        mdc_init_rpc_lock(cli->cl_rpc_lock);
2404
2405        ptlrpcd_addref();
2406
2407        OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2408        if (!cli->cl_close_lock)
2409                GOTO(err_rpc_lock, rc = -ENOMEM);
2410        mdc_init_rpc_lock(cli->cl_close_lock);
2411
2412        rc = client_obd_setup(obd, cfg);
2413        if (rc)
2414                GOTO(err_close_lock, rc);
2415        lprocfs_mdc_init_vars(&lvars);
2416        lprocfs_obd_setup(obd, lvars.obd_vars);
2417        sptlrpc_lprocfs_cliobd_attach(obd);
2418        ptlrpc_lprocfs_register_obd(obd);
2419
2420        ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
2421
2422        obd->obd_namespace->ns_lvbo = &inode_lvbo;
2423
2424        rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
2425        if (rc) {
2426                mdc_cleanup(obd);
2427                CERROR("failed to setup llogging subsystems\n");
2428        }
2429
2430        RETURN(rc);
2431
2432err_close_lock:
2433        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2434err_rpc_lock:
2435        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2436        ptlrpcd_decref();
2437        RETURN(rc);
2438}
2439
2440/* Initialize the default and maximum LOV EA and cookie sizes.  This allows
2441 * us to make MDS RPCs with large enough reply buffers to hold the
2442 * maximum-sized (= maximum striped) EA and cookie without having to
2443 * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
2444static int mdc_init_ea_size(struct obd_export *exp, int easize,
2445                     int def_easize, int cookiesize)
2446{
2447        struct obd_device *obd = exp->exp_obd;
2448        struct client_obd *cli = &obd->u.cli;
2449        ENTRY;
2450
2451        if (cli->cl_max_mds_easize < easize)
2452                cli->cl_max_mds_easize = easize;
2453
2454        if (cli->cl_default_mds_easize < def_easize)
2455                cli->cl_default_mds_easize = def_easize;
2456
2457        if (cli->cl_max_mds_cookiesize < cookiesize)
2458                cli->cl_max_mds_cookiesize = cookiesize;
2459
2460        RETURN(0);
2461}
2462
2463static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2464{
2465        int rc = 0;
2466        ENTRY;
2467
2468        switch (stage) {
2469        case OBD_CLEANUP_EARLY:
2470                break;
2471        case OBD_CLEANUP_EXPORTS:
2472                /* Failsafe, ok if racy */
2473                if (obd->obd_type->typ_refcnt <= 1)
2474                        libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
2475
2476                obd_cleanup_client_import(obd);
2477                ptlrpc_lprocfs_unregister_obd(obd);
2478                lprocfs_obd_cleanup(obd);
2479
2480                rc = obd_llog_finish(obd, 0);
2481                if (rc != 0)
2482                        CERROR("failed to cleanup llogging subsystems\n");
2483                break;
2484        }
2485        RETURN(rc);
2486}
2487
2488static int mdc_cleanup(struct obd_device *obd)
2489{
2490        struct client_obd *cli = &obd->u.cli;
2491
2492        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
2493        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
2494
2495        ptlrpcd_decref();
2496
2497        return client_obd_cleanup(obd);
2498}
2499
2500
2501static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
2502                         struct obd_device *tgt, int *index)
2503{
2504        struct llog_ctxt        *ctxt;
2505        int                      rc;
2506
2507        ENTRY;
2508
2509        LASSERT(olg == &obd->obd_olg);
2510
2511        rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt,
2512                        &llog_client_ops);
2513        if (rc)
2514                RETURN(rc);
2515
2516        ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2517        llog_initiator_connect(ctxt);
2518        llog_ctxt_put(ctxt);
2519
2520        RETURN(0);
2521}
2522
2523static int mdc_llog_finish(struct obd_device *obd, int count)
2524{
2525        struct llog_ctxt *ctxt;
2526
2527        ENTRY;
2528
2529        ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2530        if (ctxt)
2531                llog_cleanup(NULL, ctxt);
2532
2533        RETURN(0);
2534}
2535
2536static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
2537{
2538        struct lustre_cfg *lcfg = buf;
2539        struct lprocfs_static_vars lvars = { 0 };
2540        int rc = 0;
2541
2542        lprocfs_mdc_init_vars(&lvars);
2543        switch (lcfg->lcfg_command) {
2544        default:
2545                rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
2546                                              lcfg, obd);
2547                if (rc > 0)
2548                        rc = 0;
2549                break;
2550        }
2551        return(rc);
2552}
2553
2554
2555/* get remote permission for current user on fid */
2556int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2557                        struct obd_capa *oc, __u32 suppgid,
2558                        struct ptlrpc_request **request)
2559{
2560        struct ptlrpc_request  *req;
2561        int                 rc;
2562        ENTRY;
2563
2564        LASSERT(client_is_remote(exp));
2565
2566        *request = NULL;
2567        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
2568        if (req == NULL)
2569                RETURN(-ENOMEM);
2570
2571        mdc_set_capa_size(req, &RMF_CAPA1, oc);
2572
2573        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
2574        if (rc) {
2575                ptlrpc_request_free(req);
2576                RETURN(rc);
2577        }
2578
2579        mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0);
2580
2581        req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
2582                             sizeof(struct mdt_remote_perm));
2583
2584        ptlrpc_request_set_replen(req);
2585
2586        rc = ptlrpc_queue_wait(req);
2587        if (rc)
2588                ptlrpc_req_finished(req);
2589        else
2590                *request = req;
2591        RETURN(rc);
2592}
2593
2594static int mdc_interpret_renew_capa(const struct lu_env *env,
2595                                    struct ptlrpc_request *req, void *args,
2596                                    int status)
2597{
2598        struct mdc_renew_capa_args *ra = args;
2599        struct mdt_body *body = NULL;
2600        struct lustre_capa *capa;
2601        ENTRY;
2602
2603        if (status)
2604                GOTO(out, capa = ERR_PTR(status));
2605
2606        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2607        if (body == NULL)
2608                GOTO(out, capa = ERR_PTR(-EFAULT));
2609
2610        if ((body->valid & OBD_MD_FLOSSCAPA) == 0)
2611                GOTO(out, capa = ERR_PTR(-ENOENT));
2612
2613        capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2);
2614        if (!capa)
2615                GOTO(out, capa = ERR_PTR(-EFAULT));
2616        EXIT;
2617out:
2618        ra->ra_cb(ra->ra_oc, capa);
2619        return 0;
2620}
2621
2622static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2623                          renew_capa_cb_t cb)
2624{
2625        struct ptlrpc_request *req;
2626        struct mdc_renew_capa_args *ra;
2627        ENTRY;
2628
2629        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR,
2630                                        LUSTRE_MDS_VERSION, MDS_GETATTR);
2631        if (req == NULL)
2632                RETURN(-ENOMEM);
2633
2634        /* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the
2635         * capa to renew is oss capa.
2636         */
2637        mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0);
2638        ptlrpc_request_set_replen(req);
2639
2640        CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args));
2641        ra = ptlrpc_req_async_args(req);
2642        ra->ra_oc = oc;
2643        ra->ra_cb = cb;
2644        req->rq_interpret_reply = mdc_interpret_renew_capa;
2645        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2646        RETURN(0);
2647}
2648
2649static int mdc_connect(const struct lu_env *env,
2650                       struct obd_export **exp,
2651                       struct obd_device *obd, struct obd_uuid *cluuid,
2652                       struct obd_connect_data *data,
2653                       void *localdata)
2654{
2655        struct obd_import *imp = obd->u.cli.cl_import;
2656
2657        /* mds-mds import features */
2658        if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
2659                spin_lock(&imp->imp_lock);
2660                imp->imp_server_timeout = 1;
2661                spin_unlock(&imp->imp_lock);
2662                imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
2663                CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n",
2664                       obd->obd_name);
2665        }
2666
2667        return client_connect_import(env, exp, obd, cluuid, data, NULL);
2668}
2669
2670struct obd_ops mdc_obd_ops = {
2671        .o_owner            = THIS_MODULE,
2672        .o_setup            = mdc_setup,
2673        .o_precleanup       = mdc_precleanup,
2674        .o_cleanup        = mdc_cleanup,
2675        .o_add_conn      = client_import_add_conn,
2676        .o_del_conn      = client_import_del_conn,
2677        .o_connect        = mdc_connect,
2678        .o_disconnect       = client_disconnect_export,
2679        .o_iocontrol    = mdc_iocontrol,
2680        .o_set_info_async   = mdc_set_info_async,
2681        .o_statfs          = mdc_statfs,
2682        .o_pin        = mdc_pin,
2683        .o_unpin            = mdc_unpin,
2684        .o_fid_init         = client_fid_init,
2685        .o_fid_fini         = client_fid_fini,
2686        .o_fid_alloc    = mdc_fid_alloc,
2687        .o_import_event     = mdc_import_event,
2688        .o_llog_init    = mdc_llog_init,
2689        .o_llog_finish      = mdc_llog_finish,
2690        .o_get_info      = mdc_get_info,
2691        .o_process_config   = mdc_process_config,
2692        .o_get_uuid      = mdc_get_uuid,
2693        .o_quotactl      = mdc_quotactl,
2694        .o_quotacheck       = mdc_quotacheck
2695};
2696
2697struct md_ops mdc_md_ops = {
2698        .m_getstatus    = mdc_getstatus,
2699        .m_null_inode       = mdc_null_inode,
2700        .m_find_cbdata      = mdc_find_cbdata,
2701        .m_close            = mdc_close,
2702        .m_create          = mdc_create,
2703        .m_done_writing     = mdc_done_writing,
2704        .m_enqueue        = mdc_enqueue,
2705        .m_getattr        = mdc_getattr,
2706        .m_getattr_name     = mdc_getattr_name,
2707        .m_intent_lock      = mdc_intent_lock,
2708        .m_link      = mdc_link,
2709        .m_is_subdir    = mdc_is_subdir,
2710        .m_rename          = mdc_rename,
2711        .m_setattr        = mdc_setattr,
2712        .m_setxattr      = mdc_setxattr,
2713        .m_getxattr      = mdc_getxattr,
2714        .m_sync      = mdc_sync,
2715        .m_readpage      = mdc_readpage,
2716        .m_unlink          = mdc_unlink,
2717        .m_cancel_unused    = mdc_cancel_unused,
2718        .m_init_ea_size     = mdc_init_ea_size,
2719        .m_set_lock_data    = mdc_set_lock_data,
2720        .m_lock_match       = mdc_lock_match,
2721        .m_get_lustre_md    = mdc_get_lustre_md,
2722        .m_free_lustre_md   = mdc_free_lustre_md,
2723        .m_set_open_replay_data = mdc_set_open_replay_data,
2724        .m_clear_open_replay_data = mdc_clear_open_replay_data,
2725        .m_renew_capa       = mdc_renew_capa,
2726        .m_unpack_capa      = mdc_unpack_capa,
2727        .m_get_remote_perm  = mdc_get_remote_perm,
2728        .m_intent_getattr_async = mdc_intent_getattr_async,
2729        .m_revalidate_lock      = mdc_revalidate_lock
2730};
2731
2732int __init mdc_init(void)
2733{
2734        int rc;
2735        struct lprocfs_static_vars lvars = { 0 };
2736        lprocfs_mdc_init_vars(&lvars);
2737
2738        rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
2739                                 LUSTRE_MDC_NAME, NULL);
2740        RETURN(rc);
2741}
2742
2743static void /*__exit*/ mdc_exit(void)
2744{
2745        class_unregister_type(LUSTRE_MDC_NAME);
2746}
2747
2748MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2749MODULE_DESCRIPTION("Lustre Metadata Client");
2750MODULE_LICENSE("GPL");
2751
2752module_init(mdc_init);
2753module_exit(mdc_exit);
2754