linux/drivers/staging/lustre/lustre/mdc/mdc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33#define DEBUG_SUBSYSTEM S_MDC
  34
  35# include <linux/module.h>
  36# include <linux/pagemap.h>
  37# include <linux/miscdevice.h>
  38# include <linux/init.h>
  39# include <linux/utsname.h>
  40
  41#include "../include/lustre_acl.h"
  42#include "../include/lustre/lustre_ioctl.h"
  43#include "../include/obd_class.h"
  44#include "../include/lustre_lmv.h"
  45#include "../include/lustre_fid.h"
  46#include "../include/lprocfs_status.h"
  47#include "../include/lustre_param.h"
  48#include "../include/lustre_log.h"
  49#include "../include/lustre_kernelcomm.h"
  50
  51#include "mdc_internal.h"
  52
  53#define REQUEST_MINOR 244
  54
  55static int mdc_cleanup(struct obd_device *obd);
  56
  57static inline int mdc_queue_wait(struct ptlrpc_request *req)
  58{
  59        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
  60        int rc;
  61
  62        /* obd_get_request_slot() ensures that this client has no more
  63         * than cl_max_rpcs_in_flight RPCs simultaneously inf light
  64         * against an MDT.
  65         */
  66        rc = obd_get_request_slot(cli);
  67        if (rc != 0)
  68                return rc;
  69
  70        rc = ptlrpc_queue_wait(req);
  71        obd_put_request_slot(cli);
  72
  73        return rc;
  74}
  75
  76static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid)
  77{
  78        struct ptlrpc_request *req;
  79        struct mdt_body       *body;
  80        int                 rc;
  81
  82        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
  83                                        &RQF_MDS_GETSTATUS,
  84                                        LUSTRE_MDS_VERSION, MDS_GETSTATUS);
  85        if (!req)
  86                return -ENOMEM;
  87
  88        mdc_pack_body(req, NULL, 0, 0, -1, 0);
  89        req->rq_send_state = LUSTRE_IMP_FULL;
  90
  91        ptlrpc_request_set_replen(req);
  92
  93        rc = ptlrpc_queue_wait(req);
  94        if (rc)
  95                goto out;
  96
  97        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
  98        if (!body) {
  99                rc = -EPROTO;
 100                goto out;
 101        }
 102
 103        *rootfid = body->mbo_fid1;
 104        CDEBUG(D_NET,
 105               "root fid="DFID", last_committed=%llu\n",
 106               PFID(rootfid),
 107               lustre_msg_get_last_committed(req->rq_repmsg));
 108out:
 109        ptlrpc_req_finished(req);
 110        return rc;
 111}
 112
 113/*
 114 * This function now is known to always saying that it will receive 4 buffers
 115 * from server. Even for cases when acl_size and md_size is zero, RPC header
 116 * will contain 4 fields and RPC itself will contain zero size fields. This is
 117 * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
 118 * and thus zero, it shrinks it, making zero size. The same story about
 119 * md_size. And this is course of problem when client waits for smaller number
 120 * of fields. This issue will be fixed later when client gets aware of RPC
 121 * layouts.  --umka
 122 */
 123static int mdc_getattr_common(struct obd_export *exp,
 124                              struct ptlrpc_request *req)
 125{
 126        struct req_capsule *pill = &req->rq_pill;
 127        struct mdt_body    *body;
 128        void           *eadata;
 129        int              rc;
 130
 131        /* Request message already built. */
 132        rc = ptlrpc_queue_wait(req);
 133        if (rc != 0)
 134                return rc;
 135
 136        /* sanity check for the reply */
 137        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 138        if (!body)
 139                return -EPROTO;
 140
 141        CDEBUG(D_NET, "mode: %o\n", body->mbo_mode);
 142
 143        mdc_update_max_ea_from_body(exp, body);
 144        if (body->mbo_eadatasize != 0) {
 145                eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 146                                                      body->mbo_eadatasize);
 147                if (!eadata)
 148                        return -EPROTO;
 149        }
 150
 151        return 0;
 152}
 153
 154static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
 155                       struct ptlrpc_request **request)
 156{
 157        struct ptlrpc_request *req;
 158        int                 rc;
 159
 160        /* Single MDS without an LMV case */
 161        if (op_data->op_flags & MF_GET_MDT_IDX) {
 162                op_data->op_mds = 0;
 163                return 0;
 164        }
 165        *request = NULL;
 166        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
 167        if (!req)
 168                return -ENOMEM;
 169
 170        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
 171        if (rc) {
 172                ptlrpc_request_free(req);
 173                return rc;
 174        }
 175
 176        mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
 177                      op_data->op_mode, -1, 0);
 178
 179        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 180                             op_data->op_mode);
 181        ptlrpc_request_set_replen(req);
 182
 183        rc = mdc_getattr_common(exp, req);
 184        if (rc)
 185                ptlrpc_req_finished(req);
 186        else
 187                *request = req;
 188        return rc;
 189}
 190
 191static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 192                            struct ptlrpc_request **request)
 193{
 194        struct ptlrpc_request *req;
 195        int                 rc;
 196
 197        *request = NULL;
 198        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 199                                   &RQF_MDS_GETATTR_NAME);
 200        if (!req)
 201                return -ENOMEM;
 202
 203        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 204                             op_data->op_namelen + 1);
 205
 206        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
 207        if (rc) {
 208                ptlrpc_request_free(req);
 209                return rc;
 210        }
 211
 212        mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
 213                      op_data->op_mode, op_data->op_suppgids[0], 0);
 214
 215        if (op_data->op_name) {
 216                char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 217
 218                LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
 219                                op_data->op_namelen);
 220                memcpy(name, op_data->op_name, op_data->op_namelen);
 221        }
 222
 223        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 224                             op_data->op_mode);
 225        ptlrpc_request_set_replen(req);
 226
 227        rc = mdc_getattr_common(exp, req);
 228        if (rc)
 229                ptlrpc_req_finished(req);
 230        else
 231                *request = req;
 232        return rc;
 233}
 234
 235static int mdc_xattr_common(struct obd_export *exp,
 236                            const struct req_format *fmt,
 237                            const struct lu_fid *fid,
 238                            int opcode, u64 valid,
 239                            const char *xattr_name, const char *input,
 240                            int input_size, int output_size, int flags,
 241                            __u32 suppgid, struct ptlrpc_request **request)
 242{
 243        struct ptlrpc_request *req;
 244        int   xattr_namelen = 0;
 245        char *tmp;
 246        int   rc;
 247
 248        *request = NULL;
 249        req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
 250        if (!req)
 251                return -ENOMEM;
 252
 253        if (xattr_name) {
 254                xattr_namelen = strlen(xattr_name) + 1;
 255                req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 256                                     xattr_namelen);
 257        }
 258        if (input_size) {
 259                LASSERT(input);
 260                req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
 261                                     input_size);
 262        }
 263
 264        /* Flush local XATTR locks to get rid of a possible cancel RPC */
 265        if (opcode == MDS_REINT && fid_is_sane(fid) &&
 266            exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
 267                LIST_HEAD(cancels);
 268                int count;
 269
 270                /* Without that packing would fail */
 271                if (input_size == 0)
 272                        req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
 273                                             RCL_CLIENT, 0);
 274
 275                count = mdc_resource_get_unused(exp, fid,
 276                                                &cancels, LCK_EX,
 277                                                MDS_INODELOCK_XATTR);
 278
 279                rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
 280                if (rc) {
 281                        ptlrpc_request_free(req);
 282                        return rc;
 283                }
 284        } else {
 285                rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
 286                if (rc) {
 287                        ptlrpc_request_free(req);
 288                        return rc;
 289                }
 290        }
 291
 292        if (opcode == MDS_REINT) {
 293                struct mdt_rec_setxattr *rec;
 294
 295                CLASSERT(sizeof(struct mdt_rec_setxattr) ==
 296                         sizeof(struct mdt_rec_reint));
 297                rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 298                rec->sx_opcode = REINT_SETXATTR;
 299                rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
 300                rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
 301                rec->sx_cap    = cfs_curproc_cap_pack();
 302                rec->sx_suppgid1 = suppgid;
 303                rec->sx_suppgid2 = -1;
 304                rec->sx_fid    = *fid;
 305                rec->sx_valid  = valid | OBD_MD_FLCTIME;
 306                rec->sx_time   = ktime_get_real_seconds();
 307                rec->sx_size   = output_size;
 308                rec->sx_flags  = flags;
 309
 310        } else {
 311                mdc_pack_body(req, fid, valid, output_size, suppgid, flags);
 312        }
 313
 314        if (xattr_name) {
 315                tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
 316                memcpy(tmp, xattr_name, xattr_namelen);
 317        }
 318        if (input_size) {
 319                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
 320                memcpy(tmp, input, input_size);
 321        }
 322
 323        if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
 324                req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
 325                                     RCL_SERVER, output_size);
 326        ptlrpc_request_set_replen(req);
 327
 328        /* make rpc */
 329        if (opcode == MDS_REINT)
 330                mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 331
 332        rc = ptlrpc_queue_wait(req);
 333
 334        if (opcode == MDS_REINT)
 335                mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 336
 337        if (rc)
 338                ptlrpc_req_finished(req);
 339        else
 340                *request = req;
 341        return rc;
 342}
 343
 344static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
 345                        u64 valid, const char *xattr_name,
 346                        const char *input, int input_size, int output_size,
 347                        int flags, __u32 suppgid,
 348                        struct ptlrpc_request **request)
 349{
 350        return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
 351                                fid, MDS_REINT, valid, xattr_name,
 352                                input, input_size, output_size, flags,
 353                                suppgid, request);
 354}
 355
 356static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
 357                        u64 valid, const char *xattr_name,
 358                        const char *input, int input_size, int output_size,
 359                        int flags, struct ptlrpc_request **request)
 360{
 361        return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
 362                                fid, MDS_GETXATTR, valid, xattr_name,
 363                                input, input_size, output_size, flags,
 364                                -1, request);
 365}
 366
 367#ifdef CONFIG_FS_POSIX_ACL
 368static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
 369{
 370        struct req_capsule     *pill = &req->rq_pill;
 371        struct mdt_body *body = md->body;
 372        struct posix_acl       *acl;
 373        void               *buf;
 374        int                  rc;
 375
 376        if (!body->mbo_aclsize)
 377                return 0;
 378
 379        buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->mbo_aclsize);
 380
 381        if (!buf)
 382                return -EPROTO;
 383
 384        acl = posix_acl_from_xattr(&init_user_ns, buf, body->mbo_aclsize);
 385        if (!acl)
 386                return 0;
 387
 388        if (IS_ERR(acl)) {
 389                rc = PTR_ERR(acl);
 390                CERROR("convert xattr to acl: %d\n", rc);
 391                return rc;
 392        }
 393
 394        rc = posix_acl_valid(&init_user_ns, acl);
 395        if (rc) {
 396                CERROR("validate acl: %d\n", rc);
 397                posix_acl_release(acl);
 398                return rc;
 399        }
 400
 401        md->posix_acl = acl;
 402        return 0;
 403}
 404#else
 405#define mdc_unpack_acl(req, md) 0
 406#endif
 407
 408static int mdc_get_lustre_md(struct obd_export *exp,
 409                             struct ptlrpc_request *req,
 410                             struct obd_export *dt_exp,
 411                             struct obd_export *md_exp,
 412                             struct lustre_md *md)
 413{
 414        struct req_capsule *pill = &req->rq_pill;
 415        int rc;
 416
 417        LASSERT(md);
 418        memset(md, 0, sizeof(*md));
 419
 420        md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
 421
 422        if (md->body->mbo_valid & OBD_MD_FLEASIZE) {
 423                int lmmsize;
 424                struct lov_mds_md *lmm;
 425
 426                if (!S_ISREG(md->body->mbo_mode)) {
 427                        CDEBUG(D_INFO,
 428                               "OBD_MD_FLEASIZE set, should be a regular file, but is not\n");
 429                        rc = -EPROTO;
 430                        goto out;
 431                }
 432
 433                if (md->body->mbo_eadatasize == 0) {
 434                        CDEBUG(D_INFO,
 435                               "OBD_MD_FLEASIZE set, but eadatasize 0\n");
 436                        rc = -EPROTO;
 437                        goto out;
 438                }
 439                lmmsize = md->body->mbo_eadatasize;
 440                lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
 441                if (!lmm) {
 442                        rc = -EPROTO;
 443                        goto out;
 444                }
 445
 446                rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
 447                if (rc < 0)
 448                        goto out;
 449
 450                if (rc < (typeof(rc))sizeof(*md->lsm)) {
 451                        CDEBUG(D_INFO,
 452                               "lsm size too small: rc < sizeof (*md->lsm) (%d < %d)\n",
 453                               rc, (int)sizeof(*md->lsm));
 454                        rc = -EPROTO;
 455                        goto out;
 456                }
 457
 458        } else if (md->body->mbo_valid & OBD_MD_FLDIREA) {
 459                int lmvsize;
 460                struct lov_mds_md *lmv;
 461
 462                if (!S_ISDIR(md->body->mbo_mode)) {
 463                        CDEBUG(D_INFO,
 464                               "OBD_MD_FLDIREA set, should be a directory, but is not\n");
 465                        rc = -EPROTO;
 466                        goto out;
 467                }
 468
 469                if (md->body->mbo_eadatasize == 0) {
 470                        CDEBUG(D_INFO,
 471                               "OBD_MD_FLDIREA is set, but eadatasize 0\n");
 472                        return -EPROTO;
 473                }
 474                if (md->body->mbo_valid & OBD_MD_MEA) {
 475                        lmvsize = md->body->mbo_eadatasize;
 476                        lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
 477                                                           lmvsize);
 478                        if (!lmv) {
 479                                rc = -EPROTO;
 480                                goto out;
 481                        }
 482
 483                        rc = obd_unpackmd(md_exp, (void *)&md->lmv, lmv,
 484                                          lmvsize);
 485                        if (rc < 0)
 486                                goto out;
 487
 488                        if (rc < (typeof(rc))sizeof(*md->lmv)) {
 489                                CDEBUG(D_INFO,
 490                                       "size too small: rc < sizeof(*md->lmv) (%d < %d)\n",
 491                                        rc, (int)sizeof(*md->lmv));
 492                                rc = -EPROTO;
 493                                goto out;
 494                        }
 495                }
 496        }
 497        rc = 0;
 498
 499        if (md->body->mbo_valid & OBD_MD_FLACL) {
 500                /* for ACL, it's possible that FLACL is set but aclsize is zero.
 501                 * only when aclsize != 0 there's an actual segment for ACL
 502                 * in reply buffer.
 503                 */
 504                if (md->body->mbo_aclsize) {
 505                        rc = mdc_unpack_acl(req, md);
 506                        if (rc)
 507                                goto out;
 508#ifdef CONFIG_FS_POSIX_ACL
 509                } else {
 510                        md->posix_acl = NULL;
 511#endif
 512                }
 513        }
 514
 515out:
 516        if (rc) {
 517#ifdef CONFIG_FS_POSIX_ACL
 518                posix_acl_release(md->posix_acl);
 519#endif
 520                if (md->lsm)
 521                        obd_free_memmd(dt_exp, &md->lsm);
 522        }
 523        return rc;
 524}
 525
 526static int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 527{
 528        return 0;
 529}
 530
 531/**
 532 * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
 533 * RPC chains.
 534 */
 535void mdc_replay_open(struct ptlrpc_request *req)
 536{
 537        struct md_open_data *mod = req->rq_cb_data;
 538        struct ptlrpc_request *close_req;
 539        struct obd_client_handle *och;
 540        struct lustre_handle old;
 541        struct mdt_body *body;
 542
 543        if (!mod) {
 544                DEBUG_REQ(D_ERROR, req,
 545                          "Can't properly replay without open data.");
 546                return;
 547        }
 548
 549        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 550
 551        och = mod->mod_och;
 552        if (och) {
 553                struct lustre_handle *file_fh;
 554
 555                LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
 556
 557                file_fh = &och->och_fh;
 558                CDEBUG(D_HA, "updating handle from %#llx to %#llx\n",
 559                       file_fh->cookie, body->mbo_handle.cookie);
 560                old = *file_fh;
 561                *file_fh = body->mbo_handle;
 562        }
 563        close_req = mod->mod_close_req;
 564        if (close_req) {
 565                __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
 566                struct mdt_ioepoch *epoch;
 567
 568                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
 569                epoch = req_capsule_client_get(&close_req->rq_pill,
 570                                               &RMF_MDT_EPOCH);
 571                LASSERT(epoch);
 572
 573                if (och)
 574                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
 575                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
 576                epoch->handle = body->mbo_handle;
 577        }
 578}
 579
 580void mdc_commit_open(struct ptlrpc_request *req)
 581{
 582        struct md_open_data *mod = req->rq_cb_data;
 583
 584        if (!mod)
 585                return;
 586
 587        /**
 588         * No need to touch md_open_data::mod_och, it holds a reference on
 589         * \var mod and will zero references to each other, \var mod will be
 590         * freed after that when md_open_data::mod_och will put the reference.
 591         */
 592
 593        /**
 594         * Do not let open request to disappear as it still may be needed
 595         * for close rpc to happen (it may happen on evict only, otherwise
 596         * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
 597         * called), just mark this rpc as committed to distinguish these 2
 598         * cases, see mdc_close() for details. The open request reference will
 599         * be put along with freeing \var mod.
 600         */
 601        ptlrpc_request_addref(req);
 602        spin_lock(&req->rq_lock);
 603        req->rq_committed = 1;
 604        spin_unlock(&req->rq_lock);
 605        req->rq_cb_data = NULL;
 606        obd_mod_put(mod);
 607}
 608
 609int mdc_set_open_replay_data(struct obd_export *exp,
 610                             struct obd_client_handle *och,
 611                             struct lookup_intent *it)
 612{
 613        struct md_open_data   *mod;
 614        struct mdt_rec_create *rec;
 615        struct mdt_body       *body;
 616        struct ptlrpc_request *open_req = it->it_request;
 617        struct obd_import     *imp = open_req->rq_import;
 618
 619        if (!open_req->rq_replay)
 620                return 0;
 621
 622        rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
 623        body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
 624        LASSERT(rec);
 625        /* Incoming message in my byte order (it's been swabbed). */
 626        /* Outgoing messages always in my byte order. */
 627        LASSERT(body);
 628
 629        /* Only if the import is replayable, we set replay_open data */
 630        if (och && imp->imp_replayable) {
 631                mod = obd_mod_alloc();
 632                if (!mod) {
 633                        DEBUG_REQ(D_ERROR, open_req,
 634                                  "Can't allocate md_open_data");
 635                        return 0;
 636                }
 637
 638                /**
 639                 * Take a reference on \var mod, to be freed on mdc_close().
 640                 * It protects \var mod from being freed on eviction (commit
 641                 * callback is called despite rq_replay flag).
 642                 * Another reference for \var och.
 643                 */
 644                obd_mod_get(mod);
 645                obd_mod_get(mod);
 646
 647                spin_lock(&open_req->rq_lock);
 648                och->och_mod = mod;
 649                mod->mod_och = och;
 650                mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) ||
 651                                     it_disposition(it, DISP_OPEN_STRIPE);
 652                mod->mod_open_req = open_req;
 653                open_req->rq_cb_data = mod;
 654                open_req->rq_commit_cb = mdc_commit_open;
 655                spin_unlock(&open_req->rq_lock);
 656        }
 657
 658        rec->cr_fid2 = body->mbo_fid1;
 659        rec->cr_ioepoch = body->mbo_ioepoch;
 660        rec->cr_old_handle.cookie = body->mbo_handle.cookie;
 661        open_req->rq_replay_cb = mdc_replay_open;
 662        if (!fid_is_sane(&body->mbo_fid1)) {
 663                DEBUG_REQ(D_ERROR, open_req,
 664                          "Saving replay request with insane fid");
 665                LBUG();
 666        }
 667
 668        DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
 669        return 0;
 670}
 671
 672static void mdc_free_open(struct md_open_data *mod)
 673{
 674        int committed = 0;
 675
 676        if (mod->mod_is_create == 0 &&
 677            imp_connect_disp_stripe(mod->mod_open_req->rq_import))
 678                committed = 1;
 679
 680        /*
 681         * No reason to asssert here if the open request has
 682         * rq_replay == 1. It means that mdc_close failed, and
 683         * close request wasn`t sent. It is not fatal to client.
 684         * The worst thing is eviction if the client gets open lock
 685         */
 686        DEBUG_REQ(D_RPCTRACE, mod->mod_open_req,
 687                  "free open request rq_replay = %d\n",
 688                   mod->mod_open_req->rq_replay);
 689
 690        ptlrpc_request_committed(mod->mod_open_req, committed);
 691        if (mod->mod_close_req)
 692                ptlrpc_request_committed(mod->mod_close_req, committed);
 693}
 694
 695static int mdc_clear_open_replay_data(struct obd_export *exp,
 696                                      struct obd_client_handle *och)
 697{
 698        struct md_open_data *mod = och->och_mod;
 699
 700        /**
 701         * It is possible to not have \var mod in a case of eviction between
 702         * lookup and ll_file_open().
 703         **/
 704        if (!mod)
 705                return 0;
 706
 707        LASSERT(mod != LP_POISON);
 708        LASSERT(mod->mod_open_req);
 709        mdc_free_open(mod);
 710
 711        mod->mod_och = NULL;
 712        och->och_mod = NULL;
 713        obd_mod_put(mod);
 714
 715        return 0;
 716}
 717
 718/* Prepares the request for the replay by the given reply */
 719static void mdc_close_handle_reply(struct ptlrpc_request *req,
 720                                   struct md_op_data *op_data, int rc) {
 721        struct mdt_body  *repbody;
 722        struct mdt_ioepoch *epoch;
 723
 724        if (req && rc == -EAGAIN) {
 725                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 726                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
 727
 728                epoch->flags |= MF_SOM_AU;
 729                if (repbody->mbo_valid & OBD_MD_FLGETATTRLOCK)
 730                        op_data->op_flags |= MF_GETATTR_LOCK;
 731        }
 732}
 733
 734static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 735                     struct md_open_data *mod, struct ptlrpc_request **request)
 736{
 737        struct obd_device     *obd = class_exp2obd(exp);
 738        struct ptlrpc_request *req;
 739        struct req_format     *req_fmt;
 740        int                    rc;
 741        int                    saved_rc = 0;
 742
 743        req_fmt = &RQF_MDS_CLOSE;
 744        if (op_data->op_bias & MDS_HSM_RELEASE) {
 745                req_fmt = &RQF_MDS_RELEASE_CLOSE;
 746
 747                /* allocate a FID for volatile file */
 748                rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
 749                if (rc < 0) {
 750                        CERROR("%s: "DFID" failed to allocate FID: %d\n",
 751                               obd->obd_name, PFID(&op_data->op_fid1), rc);
 752                        /* save the errcode and proceed to close */
 753                        saved_rc = rc;
 754                }
 755        }
 756
 757        *request = NULL;
 758        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_CLOSE))
 759                req = NULL;
 760        else
 761                req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
 762
 763        /* Ensure that this close's handle is fixed up during replay. */
 764        if (likely(mod)) {
 765                LASSERTF(mod->mod_open_req &&
 766                         mod->mod_open_req->rq_type != LI_POISON,
 767                         "POISONED open %p!\n", mod->mod_open_req);
 768
 769                mod->mod_close_req = req;
 770
 771                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
 772                /* We no longer want to preserve this open for replay even
 773                 * though the open was committed. b=3632, b=3633
 774                 */
 775                spin_lock(&mod->mod_open_req->rq_lock);
 776                mod->mod_open_req->rq_replay = 0;
 777                spin_unlock(&mod->mod_open_req->rq_lock);
 778        } else {
 779                 CDEBUG(D_HA,
 780                        "couldn't find open req; expecting close error\n");
 781        }
 782        if (!req) {
 783                /*
 784                 * TODO: repeat close after errors
 785                 */
 786                CWARN("%s: close of FID "DFID" failed, file reference will be dropped when this client unmounts or is evicted\n",
 787                      obd->obd_name, PFID(&op_data->op_fid1));
 788                rc = -ENOMEM;
 789                goto out;
 790        }
 791
 792        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
 793        if (rc) {
 794                ptlrpc_request_free(req);
 795                goto out;
 796        }
 797
 798        /*
 799         * To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
 800         * portal whose threads are not taking any DLM locks and are therefore
 801         * always progressing
 802         */
 803        req->rq_request_portal = MDS_READPAGE_PORTAL;
 804        ptlrpc_at_set_req_timeout(req);
 805
 806        mdc_close_pack(req, op_data);
 807
 808        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
 809                             obd->u.cli.cl_default_mds_easize);
 810        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
 811                             obd->u.cli.cl_default_mds_cookiesize);
 812
 813        ptlrpc_request_set_replen(req);
 814
 815        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 816        rc = ptlrpc_queue_wait(req);
 817        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 818
 819        if (!req->rq_repmsg) {
 820                CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
 821                       req->rq_status);
 822                if (rc == 0)
 823                        rc = req->rq_status ?: -EIO;
 824        } else if (rc == 0 || rc == -EAGAIN) {
 825                struct mdt_body *body;
 826
 827                rc = lustre_msg_get_status(req->rq_repmsg);
 828                if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
 829                        DEBUG_REQ(D_ERROR, req,
 830                                  "type == PTL_RPC_MSG_ERR, err = %d", rc);
 831                        if (rc > 0)
 832                                rc = -rc;
 833                }
 834                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 835                if (!body)
 836                        rc = -EPROTO;
 837        } else if (rc == -ESTALE) {
 838                /**
 839                 * it can be allowed error after 3633 if open was committed and
 840                 * server failed before close was sent. Let's check if mod
 841                 * exists and return no error in that case
 842                 */
 843                if (mod) {
 844                        DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
 845                        if (mod->mod_open_req->rq_committed)
 846                                rc = 0;
 847                }
 848        }
 849
 850out:
 851        if (mod) {
 852                if (rc != 0)
 853                        mod->mod_close_req = NULL;
 854                /* Since now, mod is accessed through open_req only,
 855                 * thus close req does not keep a reference on mod anymore.
 856                 */
 857                obd_mod_put(mod);
 858        }
 859        *request = req;
 860        mdc_close_handle_reply(req, op_data, rc);
 861        return rc < 0 ? rc : saved_rc;
 862}
 863
 864static int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
 865                            struct md_open_data *mod)
 866{
 867        struct obd_device     *obd = class_exp2obd(exp);
 868        struct ptlrpc_request *req;
 869        int                 rc;
 870
 871        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
 872                                   &RQF_MDS_DONE_WRITING);
 873        if (!req)
 874                return -ENOMEM;
 875
 876        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
 877        if (rc) {
 878                ptlrpc_request_free(req);
 879                return rc;
 880        }
 881
 882        if (mod) {
 883                LASSERTF(mod->mod_open_req &&
 884                         mod->mod_open_req->rq_type != LI_POISON,
 885                         "POISONED setattr %p!\n", mod->mod_open_req);
 886
 887                mod->mod_close_req = req;
 888                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
 889                /* We no longer want to preserve this setattr for replay even
 890                 * though the open was committed. b=3632, b=3633
 891                 */
 892                spin_lock(&mod->mod_open_req->rq_lock);
 893                mod->mod_open_req->rq_replay = 0;
 894                spin_unlock(&mod->mod_open_req->rq_lock);
 895        }
 896
 897        mdc_close_pack(req, op_data);
 898        ptlrpc_request_set_replen(req);
 899
 900        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 901        rc = ptlrpc_queue_wait(req);
 902        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 903
 904        if (rc == -ESTALE) {
 905                /**
 906                 * it can be allowed error after 3633 if open or setattr were
 907                 * committed and server failed before close was sent.
 908                 * Let's check if mod exists and return no error in that case
 909                 */
 910                if (mod) {
 911                        if (mod->mod_open_req->rq_committed)
 912                                rc = 0;
 913                }
 914        }
 915
 916        if (mod) {
 917                if (rc != 0)
 918                        mod->mod_close_req = NULL;
 919                LASSERT(mod->mod_open_req);
 920                mdc_free_open(mod);
 921
 922                /* Since now, mod is accessed through setattr req only,
 923                 * thus DW req does not keep a reference on mod anymore.
 924                 */
 925                obd_mod_put(mod);
 926        }
 927
 928        mdc_close_handle_reply(req, op_data, rc);
 929        ptlrpc_req_finished(req);
 930        return rc;
 931}
 932
 933static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
 934                       u64 offset, struct page **pages, int npages,
 935                       struct ptlrpc_request **request)
 936{
 937        struct ptlrpc_bulk_desc *desc;
 938        struct ptlrpc_request *req;
 939        wait_queue_head_t waitq;
 940        struct l_wait_info lwi;
 941        int resends = 0;
 942        int rc;
 943        int i;
 944
 945        *request = NULL;
 946        init_waitqueue_head(&waitq);
 947
 948restart_bulk:
 949        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
 950        if (!req)
 951                return -ENOMEM;
 952
 953        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
 954        if (rc) {
 955                ptlrpc_request_free(req);
 956                return rc;
 957        }
 958
 959        req->rq_request_portal = MDS_READPAGE_PORTAL;
 960        ptlrpc_at_set_req_timeout(req);
 961
 962        desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
 963                                    MDS_BULK_PORTAL);
 964        if (!desc) {
 965                ptlrpc_request_free(req);
 966                return -ENOMEM;
 967        }
 968
 969        /* NB req now owns desc and will free it when it gets freed */
 970        for (i = 0; i < npages; i++)
 971                ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_SIZE);
 972
 973        mdc_readdir_pack(req, offset, PAGE_SIZE * npages, fid);
 974
 975        ptlrpc_request_set_replen(req);
 976        rc = ptlrpc_queue_wait(req);
 977        if (rc) {
 978                ptlrpc_req_finished(req);
 979                if (rc != -ETIMEDOUT)
 980                        return rc;
 981
 982                resends++;
 983                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
 984                        CERROR("%s: too many resend retries: rc = %d\n",
 985                               exp->exp_obd->obd_name, -EIO);
 986                        return -EIO;
 987                }
 988                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
 989                                       NULL);
 990                l_wait_event(waitq, 0, &lwi);
 991
 992                goto restart_bulk;
 993        }
 994
 995        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
 996                                          req->rq_bulk->bd_nob_transferred);
 997        if (rc < 0) {
 998                ptlrpc_req_finished(req);
 999                return rc;
1000        }
1001
1002        if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1003                CERROR("%s: unexpected bytes transferred: %d (%ld expected)\n",
1004                       exp->exp_obd->obd_name, req->rq_bulk->bd_nob_transferred,
1005                       PAGE_SIZE * npages);
1006                ptlrpc_req_finished(req);
1007                return -EPROTO;
1008        }
1009
1010        *request = req;
1011        return 0;
1012}
1013
1014static void mdc_release_page(struct page *page, int remove)
1015{
1016        if (remove) {
1017                lock_page(page);
1018                if (likely(page->mapping))
1019                        truncate_complete_page(page->mapping, page);
1020                unlock_page(page);
1021        }
1022        put_page(page);
1023}
1024
1025static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
1026                                    __u64 *start, __u64 *end, int hash64)
1027{
1028        /*
1029         * Complement of hash is used as an index so that
1030         * radix_tree_gang_lookup() can be used to find a page with starting
1031         * hash _smaller_ than one we are looking for.
1032         */
1033        unsigned long offset = hash_x_index(*hash, hash64);
1034        struct page *page;
1035        int found;
1036
1037        spin_lock_irq(&mapping->tree_lock);
1038        found = radix_tree_gang_lookup(&mapping->page_tree,
1039                                       (void **)&page, offset, 1);
1040        if (found > 0 && !radix_tree_exceptional_entry(page)) {
1041                struct lu_dirpage *dp;
1042
1043                get_page(page);
1044                spin_unlock_irq(&mapping->tree_lock);
1045                /*
1046                 * In contrast to find_lock_page() we are sure that directory
1047                 * page cannot be truncated (while DLM lock is held) and,
1048                 * hence, can avoid restart.
1049                 *
1050                 * In fact, page cannot be locked here at all, because
1051                 * mdc_read_page_remote does synchronous io.
1052                 */
1053                wait_on_page_locked(page);
1054                if (PageUptodate(page)) {
1055                        dp = kmap(page);
1056                        if (BITS_PER_LONG == 32 && hash64) {
1057                                *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
1058                                *end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
1059                                *hash  = *hash >> 32;
1060                        } else {
1061                                *start = le64_to_cpu(dp->ldp_hash_start);
1062                                *end   = le64_to_cpu(dp->ldp_hash_end);
1063                        }
1064                        if (unlikely(*start == 1 && *hash == 0))
1065                                *hash = *start;
1066                        else
1067                                LASSERTF(*start <= *hash, "start = %#llx,end = %#llx,hash = %#llx\n",
1068                                         *start, *end, *hash);
1069                        CDEBUG(D_VFSTRACE, "offset %lx [%#llx %#llx], hash %#llx\n",
1070                               offset, *start, *end, *hash);
1071                        if (*hash > *end) {
1072                                kunmap(page);
1073                                mdc_release_page(page, 0);
1074                                page = NULL;
1075                        } else if (*end != *start && *hash == *end) {
1076                                /*
1077                                 * upon hash collision, remove this page,
1078                                 * otherwise put page reference, and
1079                                 * mdc_read_page_remote() will issue RPC to
1080                                 * fetch the page we want.
1081                                 */
1082                                kunmap(page);
1083                                mdc_release_page(page,
1084                                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1085                                page = NULL;
1086                        }
1087                } else {
1088                        put_page(page);
1089                        page = ERR_PTR(-EIO);
1090                }
1091        } else {
1092                spin_unlock_irq(&mapping->tree_lock);
1093                page = NULL;
1094        }
1095        return page;
1096}
1097
1098/*
1099 * Adjust a set of pages, each page containing an array of lu_dirpages,
1100 * so that each page can be used as a single logical lu_dirpage.
1101 *
1102 * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
1103 * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
1104 * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
1105 * value is used as a cookie to request the next lu_dirpage in a
1106 * directory listing that spans multiple pages (two in this example):
1107 *   ________
1108 *  |        |
1109 * .|--------v-------   -----.
1110 * |s|e|f|p|ent|ent| ... |ent|
1111 * '--|--------------   -----'   Each PAGE contains a single
1112 *    '------.                   lu_dirpage.
1113 * .---------v-------   -----.
1114 * |s|e|f|p|ent| 0 | ... | 0 |
1115 * '-----------------   -----'
1116 *
1117 * However, on hosts where the native VM page size (PAGE_SIZE) is
1118 * larger than LU_PAGE_SIZE, a single host page may contain multiple
1119 * lu_dirpages. After reading the lu_dirpages from the MDS, the
1120 * ldp_hash_end of the first lu_dirpage refers to the one immediately
1121 * after it in the same PAGE (arrows simplified for brevity, but
1122 * in general e0==s1, e1==s2, etc.):
1123 *
1124 * .--------------------   -----.
1125 * |s0|e0|f0|p|ent|ent| ... |ent|
1126 * |---v----------------   -----|
1127 * |s1|e1|f1|p|ent|ent| ... |ent|
1128 * |---v----------------   -----|  Here, each PAGE contains
1129 *             ...                 multiple lu_dirpages.
1130 * |---v----------------   -----|
1131 * |s'|e'|f'|p|ent|ent| ... |ent|
1132 * '---|----------------   -----'
1133 *     v
1134 * .----------------------------.
1135 * |        next PAGE           |
1136 *
1137 * This structure is transformed into a single logical lu_dirpage as follows:
1138 *
1139 * - Replace e0 with e' so the request for the next lu_dirpage gets the page
1140 *   labeled 'next PAGE'.
1141 *
1142 * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
1143 *   a hash collision with the next page exists.
1144 *
1145 * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
1146 *   to the first entry of the next lu_dirpage.
1147 */
1148#if PAGE_SIZE > LU_PAGE_SIZE
1149static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
1150{
1151        int i;
1152
1153        for (i = 0; i < cfs_pgs; i++) {
1154                struct lu_dirpage *dp = kmap(pages[i]);
1155                __u64 hash_end = le64_to_cpu(dp->ldp_hash_end);
1156                __u32 flags = le32_to_cpu(dp->ldp_flags);
1157                struct lu_dirpage *first = dp;
1158                struct lu_dirent *end_dirent = NULL;
1159                struct lu_dirent *ent;
1160
1161                while (--lu_pgs > 0) {
1162                        ent = lu_dirent_start(dp);
1163                        for (end_dirent = ent; ent;
1164                             end_dirent = ent, ent = lu_dirent_next(ent));
1165
1166                        /* Advance dp to next lu_dirpage. */
1167                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
1168
1169                        /* Check if we've reached the end of the CFS_PAGE. */
1170                        if (!((unsigned long)dp & ~PAGE_MASK))
1171                                break;
1172
1173                        /* Save the hash and flags of this lu_dirpage. */
1174                        hash_end = le64_to_cpu(dp->ldp_hash_end);
1175                        flags = le32_to_cpu(dp->ldp_flags);
1176
1177                        /* Check if lu_dirpage contains no entries. */
1178                        if (!end_dirent)
1179                                break;
1180
1181                        /*
1182                         * Enlarge the end entry lde_reclen from 0 to
1183                         * first entry of next lu_dirpage.
1184                         */
1185                        LASSERT(!le16_to_cpu(end_dirent->lde_reclen));
1186                        end_dirent->lde_reclen =
1187                                cpu_to_le16((char *)(dp->ldp_entries) -
1188                                            (char *)end_dirent);
1189                }
1190
1191                first->ldp_hash_end = hash_end;
1192                first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
1193                first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
1194
1195                kunmap(pages[i]);
1196        }
1197        LASSERTF(lu_pgs == 0, "left = %d", lu_pgs);
1198}
1199#else
1200#define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0)
1201#endif  /* PAGE_SIZE > LU_PAGE_SIZE */
1202
1203/* parameters for readdir page */
1204struct readpage_param {
1205        struct md_op_data       *rp_mod;
1206        __u64                   rp_off;
1207        int                     rp_hash64;
1208        struct obd_export       *rp_exp;
1209        struct md_callback      *rp_cb;
1210};
1211
1212/**
1213 * Read pages from server.
1214 *
1215 * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains
1216 * a header lu_dirpage which describes the start/end hash, and whether this
1217 * page is empty (contains no dir entry) or hash collide with next page.
1218 * After client receives reply, several pages will be integrated into dir page
1219 * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the
1220 * lu_dirpage for this integrated page will be adjusted.
1221 **/
1222static int mdc_read_page_remote(void *data, struct page *page0)
1223{
1224        struct readpage_param *rp = data;
1225        struct page **page_pool;
1226        struct page *page;
1227        struct lu_dirpage *dp;
1228        int rd_pgs = 0; /* number of pages read actually */
1229        int npages;
1230        struct md_op_data *op_data = rp->rp_mod;
1231        struct ptlrpc_request *req;
1232        int max_pages = op_data->op_max_pages;
1233        struct inode *inode;
1234        struct lu_fid *fid;
1235        int i;
1236        int rc;
1237
1238        LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
1239        inode = op_data->op_data;
1240        fid = &op_data->op_fid1;
1241        LASSERT(inode);
1242
1243        page_pool = kcalloc(max_pages, sizeof(page), GFP_NOFS);
1244        if (page_pool) {
1245                page_pool[0] = page0;
1246        } else {
1247                page_pool = &page0;
1248                max_pages = 1;
1249        }
1250
1251        for (npages = 1; npages < max_pages; npages++) {
1252                page = page_cache_alloc_cold(inode->i_mapping);
1253                if (!page)
1254                        break;
1255                page_pool[npages] = page;
1256        }
1257
1258        rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, page_pool, npages, &req);
1259        if (!rc) {
1260                int lu_pgs = req->rq_bulk->bd_nob_transferred;
1261
1262                rd_pgs = (req->rq_bulk->bd_nob_transferred +
1263                          PAGE_SIZE - 1) >> PAGE_SHIFT;
1264                lu_pgs >>= LU_PAGE_SHIFT;
1265                LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
1266
1267                CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs);
1268
1269                mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs);
1270
1271                SetPageUptodate(page0);
1272        }
1273
1274        unlock_page(page0);
1275        ptlrpc_req_finished(req);
1276        CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages);
1277        for (i = 1; i < npages; i++) {
1278                unsigned long offset;
1279                __u64 hash;
1280                int ret;
1281
1282                page = page_pool[i];
1283
1284                if (rc < 0 || i >= rd_pgs) {
1285                        put_page(page);
1286                        continue;
1287                }
1288
1289                SetPageUptodate(page);
1290
1291                dp = kmap(page);
1292                hash = le64_to_cpu(dp->ldp_hash_start);
1293                kunmap(page);
1294
1295                offset = hash_x_index(hash, rp->rp_hash64);
1296
1297                prefetchw(&page->flags);
1298                ret = add_to_page_cache_lru(page, inode->i_mapping, offset,
1299                                            GFP_KERNEL);
1300                if (!ret)
1301                        unlock_page(page);
1302                else
1303                        CDEBUG(D_VFSTRACE, "page %lu add to page cache failed: rc = %d\n",
1304                               offset, ret);
1305                put_page(page);
1306        }
1307
1308        if (page_pool != &page0)
1309                kfree(page_pool);
1310
1311        return rc;
1312}
1313
1314/**
1315 * Read dir page from cache first, if it can not find it, read it from
1316 * server and add into the cache.
1317 *
1318 * \param[in] exp       MDC export
1319 * \param[in] op_data   client MD stack parameters, transferring parameters
1320 *                      between different layers on client MD stack.
1321 * \param[in] cb_op     callback required for ldlm lock enqueue during
1322 *                      read page
1323 * \param[in] hash_offset the hash offset of the page to be read
1324 * \param[in] ppage     the page to be read
1325 *
1326 * retval               = 0 get the page successfully
1327 *                      errno(<0) get the page failed
1328 */
1329static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
1330                         struct md_callback *cb_op, __u64 hash_offset,
1331                         struct page **ppage)
1332{
1333        struct lookup_intent it = { .it_op = IT_READDIR };
1334        struct page *page;
1335        struct inode *dir = op_data->op_data;
1336        struct address_space *mapping;
1337        struct lu_dirpage *dp;
1338        __u64 start = 0;
1339        __u64 end = 0;
1340        struct lustre_handle lockh;
1341        struct ptlrpc_request *enq_req = NULL;
1342        struct readpage_param rp_param;
1343        int rc;
1344
1345        *ppage = NULL;
1346
1347        LASSERT(dir);
1348        mapping = dir->i_mapping;
1349
1350        rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
1351                             cb_op->md_blocking_ast, 0);
1352        if (enq_req)
1353                ptlrpc_req_finished(enq_req);
1354
1355        if (rc < 0) {
1356                CERROR("%s: "DFID" lock enqueue fails: rc = %d\n",
1357                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rc);
1358                return rc;
1359        }
1360
1361        rc = 0;
1362        lockh.cookie = it.it_lock_handle;
1363        mdc_set_lock_data(exp, &lockh, dir, NULL);
1364
1365        rp_param.rp_off = hash_offset;
1366        rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
1367        page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
1368                               rp_param.rp_hash64);
1369        if (IS_ERR(page)) {
1370                CDEBUG(D_INFO, "%s: dir page locate: " DFID " at %llu: rc %ld\n",
1371                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1372                       rp_param.rp_off, PTR_ERR(page));
1373                rc = PTR_ERR(page);
1374                goto out_unlock;
1375        } else if (page) {
1376                /*
1377                 * XXX nikita: not entirely correct handling of a corner case:
1378                 * suppose hash chain of entries with hash value HASH crosses
1379                 * border between pages P0 and P1. First both P0 and P1 are
1380                 * cached, seekdir() is called for some entry from the P0 part
1381                 * of the chain. Later P0 goes out of cache. telldir(HASH)
1382                 * happens and finds P1, as it starts with matching hash
1383                 * value. Remaining entries from P0 part of the chain are
1384                 * skipped. (Is that really a bug?)
1385                 *
1386                 * Possible solutions: 0. don't cache P1 is such case, handle
1387                 * it as an "overflow" page. 1. invalidate all pages at
1388                 * once. 2. use HASH|1 as an index for P1.
1389                 */
1390                goto hash_collision;
1391        }
1392
1393        rp_param.rp_exp = exp;
1394        rp_param.rp_mod = op_data;
1395        page = read_cache_page(mapping,
1396                               hash_x_index(rp_param.rp_off,
1397                                            rp_param.rp_hash64),
1398                               mdc_read_page_remote, &rp_param);
1399        if (IS_ERR(page)) {
1400                CERROR("%s: read cache page: "DFID" at %llu: rc %ld\n",
1401                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1402                       rp_param.rp_off, PTR_ERR(page));
1403                rc = PTR_ERR(page);
1404                goto out_unlock;
1405        }
1406
1407        wait_on_page_locked(page);
1408        (void)kmap(page);
1409        if (!PageUptodate(page)) {
1410                CERROR("%s: page not updated: "DFID" at %llu: rc %d\n",
1411                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1412                       rp_param.rp_off, -5);
1413                goto fail;
1414        }
1415        if (!PageChecked(page))
1416                SetPageChecked(page);
1417        if (PageError(page)) {
1418                CERROR("%s: page error: "DFID" at %llu: rc %d\n",
1419                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1420                       rp_param.rp_off, -5);
1421                goto fail;
1422        }
1423
1424hash_collision:
1425        dp = page_address(page);
1426        if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
1427                start = le64_to_cpu(dp->ldp_hash_start) >> 32;
1428                end = le64_to_cpu(dp->ldp_hash_end) >> 32;
1429                rp_param.rp_off = hash_offset >> 32;
1430        } else {
1431                start = le64_to_cpu(dp->ldp_hash_start);
1432                end = le64_to_cpu(dp->ldp_hash_end);
1433                rp_param.rp_off = hash_offset;
1434        }
1435        if (end == start) {
1436                LASSERT(start == rp_param.rp_off);
1437                CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
1438#if BITS_PER_LONG == 32
1439                CWARN("Real page-wide hash collision at [%llu %llu] with hash %llu\n",
1440                      le64_to_cpu(dp->ldp_hash_start),
1441                      le64_to_cpu(dp->ldp_hash_end), hash_offset);
1442#endif
1443                /*
1444                 * Fetch whole overflow chain...
1445                 *
1446                 * XXX not yet.
1447                 */
1448                goto fail;
1449        }
1450        *ppage = page;
1451out_unlock:
1452        ldlm_lock_decref(&lockh, it.it_lock_mode);
1453        return rc;
1454fail:
1455        kunmap(page);
1456        mdc_release_page(page, 1);
1457        rc = -EIO;
1458        goto out_unlock;
1459}
1460
1461static int mdc_statfs(const struct lu_env *env,
1462                      struct obd_export *exp, struct obd_statfs *osfs,
1463                      __u64 max_age, __u32 flags)
1464{
1465        struct obd_device     *obd = class_exp2obd(exp);
1466        struct ptlrpc_request *req;
1467        struct obd_statfs     *msfs;
1468        struct obd_import     *imp = NULL;
1469        int                 rc;
1470
1471        /*
1472         * Since the request might also come from lprocfs, so we need
1473         * sync this with client_disconnect_export Bug15684
1474         */
1475        down_read(&obd->u.cli.cl_sem);
1476        if (obd->u.cli.cl_import)
1477                imp = class_import_get(obd->u.cli.cl_import);
1478        up_read(&obd->u.cli.cl_sem);
1479        if (!imp)
1480                return -ENODEV;
1481
1482        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1483                                        LUSTRE_MDS_VERSION, MDS_STATFS);
1484        if (!req) {
1485                rc = -ENOMEM;
1486                goto output;
1487        }
1488
1489        ptlrpc_request_set_replen(req);
1490
1491        if (flags & OBD_STATFS_NODELAY) {
1492                /* procfs requests not want stay in wait for avoid deadlock */
1493                req->rq_no_resend = 1;
1494                req->rq_no_delay = 1;
1495        }
1496
1497        rc = ptlrpc_queue_wait(req);
1498        if (rc) {
1499                /* check connection error first */
1500                if (imp->imp_connect_error)
1501                        rc = imp->imp_connect_error;
1502                goto out;
1503        }
1504
1505        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1506        if (!msfs) {
1507                rc = -EPROTO;
1508                goto out;
1509        }
1510
1511        *osfs = *msfs;
1512out:
1513        ptlrpc_req_finished(req);
1514output:
1515        class_import_put(imp);
1516        return rc;
1517}
1518
1519static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1520{
1521        __u32 keylen, vallen;
1522        void *key;
1523        int rc;
1524
1525        if (gf->gf_pathlen > PATH_MAX)
1526                return -ENAMETOOLONG;
1527        if (gf->gf_pathlen < 2)
1528                return -EOVERFLOW;
1529
1530        /* Key is KEY_FID2PATH + getinfo_fid2path description */
1531        keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1532        key = kzalloc(keylen, GFP_NOFS);
1533        if (!key)
1534                return -ENOMEM;
1535        memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1536        memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1537
1538        CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
1539               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1540
1541        if (!fid_is_sane(&gf->gf_fid)) {
1542                rc = -EINVAL;
1543                goto out;
1544        }
1545
1546        /* Val is struct getinfo_fid2path result plus path */
1547        vallen = sizeof(*gf) + gf->gf_pathlen;
1548
1549        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
1550        if (rc != 0 && rc != -EREMOTE)
1551                goto out;
1552
1553        if (vallen <= sizeof(*gf)) {
1554                rc = -EPROTO;
1555                goto out;
1556        } else if (vallen > sizeof(*gf) + gf->gf_pathlen) {
1557                rc = -EOVERFLOW;
1558                goto out;
1559        }
1560
1561        CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n%s\n",
1562               PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1563
1564out:
1565        kfree(key);
1566        return rc;
1567}
1568
1569static int mdc_ioc_hsm_progress(struct obd_export *exp,
1570                                struct hsm_progress_kernel *hpk)
1571{
1572        struct obd_import               *imp = class_exp2cliimp(exp);
1573        struct hsm_progress_kernel      *req_hpk;
1574        struct ptlrpc_request           *req;
1575        int                              rc;
1576
1577        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1578                                        LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1579        if (!req) {
1580                rc = -ENOMEM;
1581                goto out;
1582        }
1583
1584        mdc_pack_body(req, NULL, 0, 0, -1, 0);
1585
1586        /* Copy hsm_progress struct */
1587        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1588        if (!req_hpk) {
1589                rc = -EPROTO;
1590                goto out;
1591        }
1592
1593        *req_hpk = *hpk;
1594        req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
1595
1596        ptlrpc_request_set_replen(req);
1597
1598        rc = mdc_queue_wait(req);
1599out:
1600        ptlrpc_req_finished(req);
1601        return rc;
1602}
1603
1604static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
1605{
1606        __u32                   *archive_mask;
1607        struct ptlrpc_request   *req;
1608        int                      rc;
1609
1610        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER,
1611                                        LUSTRE_MDS_VERSION,
1612                                        MDS_HSM_CT_REGISTER);
1613        if (!req) {
1614                rc = -ENOMEM;
1615                goto out;
1616        }
1617
1618        mdc_pack_body(req, NULL, 0, 0, -1, 0);
1619
1620        /* Copy hsm_progress struct */
1621        archive_mask = req_capsule_client_get(&req->rq_pill,
1622                                              &RMF_MDS_HSM_ARCHIVE);
1623        if (!archive_mask) {
1624                rc = -EPROTO;
1625                goto out;
1626        }
1627
1628        *archive_mask = archives;
1629
1630        ptlrpc_request_set_replen(req);
1631
1632        rc = mdc_queue_wait(req);
1633out:
1634        ptlrpc_req_finished(req);
1635        return rc;
1636}
1637
1638static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1639                                      struct md_op_data *op_data)
1640{
1641        struct hsm_current_action       *hca = op_data->op_data;
1642        struct hsm_current_action       *req_hca;
1643        struct ptlrpc_request           *req;
1644        int                              rc;
1645
1646        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1647                                   &RQF_MDS_HSM_ACTION);
1648        if (!req)
1649                return -ENOMEM;
1650
1651        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1652        if (rc) {
1653                ptlrpc_request_free(req);
1654                return rc;
1655        }
1656
1657        mdc_pack_body(req, &op_data->op_fid1, 0, 0,
1658                      op_data->op_suppgids[0], 0);
1659
1660        ptlrpc_request_set_replen(req);
1661
1662        rc = mdc_queue_wait(req);
1663        if (rc)
1664                goto out;
1665
1666        req_hca = req_capsule_server_get(&req->rq_pill,
1667                                         &RMF_MDS_HSM_CURRENT_ACTION);
1668        if (!req_hca) {
1669                rc = -EPROTO;
1670                goto out;
1671        }
1672
1673        *hca = *req_hca;
1674
1675out:
1676        ptlrpc_req_finished(req);
1677        return rc;
1678}
1679
1680static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1681{
1682        struct ptlrpc_request   *req;
1683        int                      rc;
1684
1685        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1686                                        LUSTRE_MDS_VERSION,
1687                                        MDS_HSM_CT_UNREGISTER);
1688        if (!req) {
1689                rc = -ENOMEM;
1690                goto out;
1691        }
1692
1693        mdc_pack_body(req, NULL, 0, 0, -1, 0);
1694
1695        ptlrpc_request_set_replen(req);
1696
1697        rc = mdc_queue_wait(req);
1698out:
1699        ptlrpc_req_finished(req);
1700        return rc;
1701}
1702
1703static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1704                                 struct md_op_data *op_data)
1705{
1706        struct hsm_user_state   *hus = op_data->op_data;
1707        struct hsm_user_state   *req_hus;
1708        struct ptlrpc_request   *req;
1709        int                      rc;
1710
1711        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1712                                   &RQF_MDS_HSM_STATE_GET);
1713        if (!req)
1714                return -ENOMEM;
1715
1716        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1717        if (rc != 0) {
1718                ptlrpc_request_free(req);
1719                return rc;
1720        }
1721
1722        mdc_pack_body(req, &op_data->op_fid1, 0, 0,
1723                      op_data->op_suppgids[0], 0);
1724
1725        ptlrpc_request_set_replen(req);
1726
1727        rc = mdc_queue_wait(req);
1728        if (rc)
1729                goto out;
1730
1731        req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1732        if (!req_hus) {
1733                rc = -EPROTO;
1734                goto out;
1735        }
1736
1737        *hus = *req_hus;
1738
1739out:
1740        ptlrpc_req_finished(req);
1741        return rc;
1742}
1743
1744static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1745                                 struct md_op_data *op_data)
1746{
1747        struct hsm_state_set    *hss = op_data->op_data;
1748        struct hsm_state_set    *req_hss;
1749        struct ptlrpc_request   *req;
1750        int                      rc;
1751
1752        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1753                                   &RQF_MDS_HSM_STATE_SET);
1754        if (!req)
1755                return -ENOMEM;
1756
1757        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1758        if (rc) {
1759                ptlrpc_request_free(req);
1760                return rc;
1761        }
1762
1763        mdc_pack_body(req, &op_data->op_fid1, 0, 0,
1764                      op_data->op_suppgids[0], 0);
1765
1766        /* Copy states */
1767        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1768        if (!req_hss) {
1769                rc = -EPROTO;
1770                goto out;
1771        }
1772        *req_hss = *hss;
1773
1774        ptlrpc_request_set_replen(req);
1775
1776        rc = mdc_queue_wait(req);
1777out:
1778        ptlrpc_req_finished(req);
1779        return rc;
1780}
1781
1782static int mdc_ioc_hsm_request(struct obd_export *exp,
1783                               struct hsm_user_request *hur)
1784{
1785        struct obd_import       *imp = class_exp2cliimp(exp);
1786        struct ptlrpc_request   *req;
1787        struct hsm_request      *req_hr;
1788        struct hsm_user_item    *req_hui;
1789        char                    *req_opaque;
1790        int                      rc;
1791
1792        req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
1793        if (!req) {
1794                rc = -ENOMEM;
1795                goto out;
1796        }
1797
1798        req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
1799                             hur->hur_request.hr_itemcount
1800                             * sizeof(struct hsm_user_item));
1801        req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
1802                             hur->hur_request.hr_data_len);
1803
1804        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
1805        if (rc) {
1806                ptlrpc_request_free(req);
1807                return rc;
1808        }
1809
1810        mdc_pack_body(req, NULL, 0, 0, -1, 0);
1811
1812        /* Copy hsm_request struct */
1813        req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
1814        if (!req_hr) {
1815                rc = -EPROTO;
1816                goto out;
1817        }
1818        *req_hr = hur->hur_request;
1819
1820        /* Copy hsm_user_item structs */
1821        req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
1822        if (!req_hui) {
1823                rc = -EPROTO;
1824                goto out;
1825        }
1826        memcpy(req_hui, hur->hur_user_item,
1827               hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
1828
1829        /* Copy opaque field */
1830        req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
1831        if (!req_opaque) {
1832                rc = -EPROTO;
1833                goto out;
1834        }
1835        memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
1836
1837        ptlrpc_request_set_replen(req);
1838
1839        rc = mdc_queue_wait(req);
1840out:
1841        ptlrpc_req_finished(req);
1842        return rc;
1843}
1844
1845static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, u32 flags)
1846{
1847        struct kuc_hdr *lh = (struct kuc_hdr *)buf;
1848
1849        LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE);
1850
1851        lh->kuc_magic = KUC_MAGIC;
1852        lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
1853        lh->kuc_flags = flags;
1854        lh->kuc_msgtype = CL_RECORD;
1855        lh->kuc_msglen = len;
1856        return lh;
1857}
1858
1859struct changelog_show {
1860        __u64           cs_startrec;
1861        enum changelog_send_flag        cs_flags;
1862        struct file     *cs_fp;
1863        char            *cs_buf;
1864        struct obd_device *cs_obd;
1865};
1866
1867static inline char *cs_obd_name(struct changelog_show *cs)
1868{
1869        return cs->cs_obd->obd_name;
1870}
1871
1872static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
1873                             struct llog_rec_hdr *hdr, void *data)
1874{
1875        struct changelog_show *cs = data;
1876        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
1877        struct kuc_hdr *lh;
1878        size_t len;
1879        int rc;
1880
1881        if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
1882                rc = -EINVAL;
1883                CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
1884                       cs_obd_name(cs), rec->cr_hdr.lrh_type,
1885                       rec->cr.cr_type, rc);
1886                return rc;
1887        }
1888
1889        if (rec->cr.cr_index < cs->cs_startrec) {
1890                /* Skip entries earlier than what we are interested in */
1891                CDEBUG(D_HSM, "rec=%llu start=%llu\n",
1892                       rec->cr.cr_index, cs->cs_startrec);
1893                return 0;
1894        }
1895
1896        CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t=" DFID " p=" DFID
1897                " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
1898                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
1899                rec->cr.cr_flags & CLF_FLAGMASK,
1900                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
1901                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
1902
1903        len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
1904
1905        /* Set up the message */
1906        lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
1907        memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
1908
1909        rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
1910        CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc);
1911
1912        return rc;
1913}
1914
1915static int mdc_changelog_send_thread(void *csdata)
1916{
1917        enum llog_flag flags = LLOG_F_IS_CAT;
1918        struct changelog_show *cs = csdata;
1919        struct llog_ctxt *ctxt = NULL;
1920        struct llog_handle *llh = NULL;
1921        struct kuc_hdr *kuch;
1922        int rc;
1923
1924        CDEBUG(D_HSM, "changelog to fp=%p start %llu\n",
1925               cs->cs_fp, cs->cs_startrec);
1926
1927        cs->cs_buf = kzalloc(KUC_CHANGELOG_MSG_MAXSIZE, GFP_NOFS);
1928        if (!cs->cs_buf) {
1929                rc = -ENOMEM;
1930                goto out;
1931        }
1932
1933        /* Set up the remote catalog handle */
1934        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
1935        if (!ctxt) {
1936                rc = -ENOENT;
1937                goto out;
1938        }
1939        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
1940                       LLOG_OPEN_EXISTS);
1941        if (rc) {
1942                CERROR("%s: fail to open changelog catalog: rc = %d\n",
1943                       cs_obd_name(cs), rc);
1944                goto out;
1945        }
1946
1947        if (cs->cs_flags & CHANGELOG_FLAG_JOBID)
1948                flags |= LLOG_F_EXT_JOBID;
1949
1950        rc = llog_init_handle(NULL, llh, flags, NULL);
1951        if (rc) {
1952                CERROR("llog_init_handle failed %d\n", rc);
1953                goto out;
1954        }
1955
1956        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
1957
1958        /* Send EOF no matter what our result */
1959        kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
1960        if (kuch) {
1961                kuch->kuc_msgtype = CL_EOF;
1962                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
1963        }
1964
1965out:
1966        fput(cs->cs_fp);
1967        if (llh)
1968                llog_cat_close(NULL, llh);
1969        if (ctxt)
1970                llog_ctxt_put(ctxt);
1971        kfree(cs->cs_buf);
1972        kfree(cs);
1973        return rc;
1974}
1975
1976static int mdc_ioc_changelog_send(struct obd_device *obd,
1977                                  struct ioc_changelog *icc)
1978{
1979        struct changelog_show *cs;
1980        struct task_struct *task;
1981        int rc;
1982
1983        /* Freed in mdc_changelog_send_thread */
1984        cs = kzalloc(sizeof(*cs), GFP_NOFS);
1985        if (!cs)
1986                return -ENOMEM;
1987
1988        cs->cs_obd = obd;
1989        cs->cs_startrec = icc->icc_recno;
1990        /* matching fput in mdc_changelog_send_thread */
1991        cs->cs_fp = fget(icc->icc_id);
1992        cs->cs_flags = icc->icc_flags;
1993
1994        /*
1995         * New thread because we should return to user app before
1996         * writing into our pipe
1997         */
1998        task = kthread_run(mdc_changelog_send_thread, cs,
1999                           "mdc_clg_send_thread");
2000        if (IS_ERR(task)) {
2001                rc = PTR_ERR(task);
2002                CERROR("%s: can't start changelog thread: rc = %d\n",
2003                       cs_obd_name(cs), rc);
2004                kfree(cs);
2005        } else {
2006                rc = 0;
2007                CDEBUG(D_HSM, "%s: started changelog thread\n",
2008                       cs_obd_name(cs));
2009        }
2010
2011        CERROR("Failed to start changelog thread: %d\n", rc);
2012        return rc;
2013}
2014
2015static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
2016                                struct lustre_kernelcomm *lk);
2017
2018static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
2019                          struct obd_quotactl *oqctl)
2020{
2021        struct client_obd       *cli = &exp->exp_obd->u.cli;
2022        struct ptlrpc_request   *req;
2023        struct obd_quotactl     *body;
2024        int                   rc;
2025
2026        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
2027                                        &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
2028                                        MDS_QUOTACHECK);
2029        if (!req)
2030                return -ENOMEM;
2031
2032        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
2033        *body = *oqctl;
2034
2035        ptlrpc_request_set_replen(req);
2036
2037        /* the next poll will find -ENODATA, that means quotacheck is
2038         * going on
2039         */
2040        cli->cl_qchk_stat = -ENODATA;
2041        rc = ptlrpc_queue_wait(req);
2042        if (rc)
2043                cli->cl_qchk_stat = rc;
2044        ptlrpc_req_finished(req);
2045        return rc;
2046}
2047
2048static int mdc_quota_poll_check(struct obd_export *exp,
2049                                struct if_quotacheck *qchk)
2050{
2051        struct client_obd *cli = &exp->exp_obd->u.cli;
2052        int rc;
2053
2054        qchk->obd_uuid = cli->cl_target_uuid;
2055        memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
2056
2057        rc = cli->cl_qchk_stat;
2058        /* the client is not the previous one */
2059        if (rc == CL_NOT_QUOTACHECKED)
2060                rc = -EINTR;
2061        return rc;
2062}
2063
2064static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
2065                        struct obd_quotactl *oqctl)
2066{
2067        struct ptlrpc_request   *req;
2068        struct obd_quotactl     *oqc;
2069        int                   rc;
2070
2071        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
2072                                        &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
2073                                        MDS_QUOTACTL);
2074        if (!req)
2075                return -ENOMEM;
2076
2077        oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
2078        *oqc = *oqctl;
2079
2080        ptlrpc_request_set_replen(req);
2081        ptlrpc_at_set_req_timeout(req);
2082        req->rq_no_resend = 1;
2083
2084        rc = ptlrpc_queue_wait(req);
2085        if (rc)
2086                CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
2087
2088        if (req->rq_repmsg) {
2089                oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
2090                if (oqc) {
2091                        *oqctl = *oqc;
2092                } else if (!rc) {
2093                        CERROR("Can't unpack obd_quotactl\n");
2094                        rc = -EPROTO;
2095                }
2096        } else if (!rc) {
2097                CERROR("Can't unpack obd_quotactl\n");
2098                rc = -EPROTO;
2099        }
2100        ptlrpc_req_finished(req);
2101
2102        return rc;
2103}
2104
2105static int mdc_ioc_swap_layouts(struct obd_export *exp,
2106                                struct md_op_data *op_data)
2107{
2108        LIST_HEAD(cancels);
2109        struct ptlrpc_request   *req;
2110        int                      rc, count;
2111        struct mdc_swap_layouts *msl, *payload;
2112
2113        msl = op_data->op_data;
2114
2115        /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
2116         * first thing it will do is to cancel the 2 layout
2117         * locks hold by this client.
2118         * So the client must cancel its layout locks on the 2 fids
2119         * with the request RPC to avoid extra RPC round trips
2120         */
2121        count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
2122                                        LCK_CR, MDS_INODELOCK_LAYOUT |
2123                                        MDS_INODELOCK_XATTR);
2124        count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
2125                                         LCK_CR, MDS_INODELOCK_LAYOUT |
2126                                         MDS_INODELOCK_XATTR);
2127
2128        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2129                                   &RQF_MDS_SWAP_LAYOUTS);
2130        if (!req) {
2131                ldlm_lock_list_put(&cancels, l_bl_ast, count);
2132                return -ENOMEM;
2133        }
2134
2135        rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
2136        if (rc) {
2137                ptlrpc_request_free(req);
2138                return rc;
2139        }
2140
2141        mdc_swap_layouts_pack(req, op_data);
2142
2143        payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
2144        LASSERT(payload);
2145
2146        *payload = *msl;
2147
2148        ptlrpc_request_set_replen(req);
2149
2150        rc = ptlrpc_queue_wait(req);
2151
2152        ptlrpc_req_finished(req);
2153        return rc;
2154}
2155
2156static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2157                         void *karg, void __user *uarg)
2158{
2159        struct obd_device *obd = exp->exp_obd;
2160        struct obd_ioctl_data *data = karg;
2161        struct obd_import *imp = obd->u.cli.cl_import;
2162        int rc;
2163
2164        if (!try_module_get(THIS_MODULE)) {
2165                CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2166                       module_name(THIS_MODULE));
2167                return -EINVAL;
2168        }
2169        switch (cmd) {
2170        case OBD_IOC_CHANGELOG_SEND:
2171                rc = mdc_ioc_changelog_send(obd, karg);
2172                goto out;
2173        case OBD_IOC_CHANGELOG_CLEAR: {
2174                struct ioc_changelog *icc = karg;
2175                struct changelog_setinfo cs = {
2176                        .cs_recno = icc->icc_recno,
2177                        .cs_id = icc->icc_id
2178                };
2179
2180                rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
2181                                        KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
2182                                        NULL);
2183                goto out;
2184        }
2185        case OBD_IOC_FID2PATH:
2186                rc = mdc_ioc_fid2path(exp, karg);
2187                goto out;
2188        case LL_IOC_HSM_CT_START:
2189                rc = mdc_ioc_hsm_ct_start(exp, karg);
2190                /* ignore if it was already registered on this MDS. */
2191                if (rc == -EEXIST)
2192                        rc = 0;
2193                goto out;
2194        case LL_IOC_HSM_PROGRESS:
2195                rc = mdc_ioc_hsm_progress(exp, karg);
2196                goto out;
2197        case LL_IOC_HSM_STATE_GET:
2198                rc = mdc_ioc_hsm_state_get(exp, karg);
2199                goto out;
2200        case LL_IOC_HSM_STATE_SET:
2201                rc = mdc_ioc_hsm_state_set(exp, karg);
2202                goto out;
2203        case LL_IOC_HSM_ACTION:
2204                rc = mdc_ioc_hsm_current_action(exp, karg);
2205                goto out;
2206        case LL_IOC_HSM_REQUEST:
2207                rc = mdc_ioc_hsm_request(exp, karg);
2208                goto out;
2209        case OBD_IOC_CLIENT_RECOVER:
2210                rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
2211                if (rc < 0)
2212                        goto out;
2213                rc = 0;
2214                goto out;
2215        case IOC_OSC_SET_ACTIVE:
2216                rc = ptlrpc_set_import_active(imp, data->ioc_offset);
2217                goto out;
2218        case OBD_IOC_POLL_QUOTACHECK:
2219                rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2220                goto out;
2221        case OBD_IOC_PING_TARGET:
2222                rc = ptlrpc_obd_ping(obd);
2223                goto out;
2224        /*
2225         * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
2226         * LMV instead of MDC. But when the cluster is upgraded from 1.8,
2227         * there'd be no LMV layer thus we might be called here. Eventually
2228         * this code should be removed.
2229         * bz20731, LU-592.
2230         */
2231        case IOC_OBD_STATFS: {
2232                struct obd_statfs stat_buf = {0};
2233
2234                if (*((__u32 *)data->ioc_inlbuf2) != 0) {
2235                        rc = -ENODEV;
2236                        goto out;
2237                }
2238
2239                /* copy UUID */
2240                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
2241                                 min_t(size_t, data->ioc_plen2,
2242                                       sizeof(struct obd_uuid)))) {
2243                        rc = -EFAULT;
2244                        goto out;
2245                }
2246
2247                rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
2248                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
2249                                0);
2250                if (rc != 0)
2251                        goto out;
2252
2253                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
2254                                 min_t(size_t, data->ioc_plen1,
2255                                       sizeof(stat_buf)))) {
2256                        rc = -EFAULT;
2257                        goto out;
2258                }
2259
2260                rc = 0;
2261                goto out;
2262        }
2263        case OBD_IOC_QUOTACTL: {
2264                struct if_quotactl *qctl = karg;
2265                struct obd_quotactl *oqctl;
2266
2267                oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
2268                if (!oqctl) {
2269                        rc = -ENOMEM;
2270                        goto out;
2271                }
2272
2273                QCTL_COPY(oqctl, qctl);
2274                rc = obd_quotactl(exp, oqctl);
2275                if (rc == 0) {
2276                        QCTL_COPY(qctl, oqctl);
2277                        qctl->qc_valid = QC_MDTIDX;
2278                        qctl->obd_uuid = obd->u.cli.cl_target_uuid;
2279                }
2280
2281                kfree(oqctl);
2282                goto out;
2283        }
2284        case LL_IOC_GET_CONNECT_FLAGS:
2285                if (copy_to_user(uarg, exp_connect_flags_ptr(exp),
2286                                 sizeof(*exp_connect_flags_ptr(exp)))) {
2287                        rc = -EFAULT;
2288                        goto out;
2289                }
2290
2291                rc = 0;
2292                goto out;
2293        case LL_IOC_LOV_SWAP_LAYOUTS:
2294                rc = mdc_ioc_swap_layouts(exp, karg);
2295                goto out;
2296        default:
2297                CERROR("unrecognised ioctl: cmd = %#x\n", cmd);
2298                rc = -ENOTTY;
2299                goto out;
2300        }
2301out:
2302        module_put(THIS_MODULE);
2303
2304        return rc;
2305}
2306
2307static int mdc_get_info_rpc(struct obd_export *exp,
2308                            u32 keylen, void *key,
2309                            int vallen, void *val)
2310{
2311        struct obd_import      *imp = class_exp2cliimp(exp);
2312        struct ptlrpc_request  *req;
2313        char               *tmp;
2314        int                  rc = -EINVAL;
2315
2316        req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
2317        if (!req)
2318                return -ENOMEM;
2319
2320        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
2321                             RCL_CLIENT, keylen);
2322        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
2323                             RCL_CLIENT, sizeof(__u32));
2324
2325        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
2326        if (rc) {
2327                ptlrpc_request_free(req);
2328                return rc;
2329        }
2330
2331        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
2332        memcpy(tmp, key, keylen);
2333        tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
2334        memcpy(tmp, &vallen, sizeof(__u32));
2335
2336        req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
2337                             RCL_SERVER, vallen);
2338        ptlrpc_request_set_replen(req);
2339
2340        rc = ptlrpc_queue_wait(req);
2341        /* -EREMOTE means the get_info result is partial, and it needs to
2342         * continue on another MDT, see fid2path part in lmv_iocontrol
2343         */
2344        if (rc == 0 || rc == -EREMOTE) {
2345                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
2346                memcpy(val, tmp, vallen);
2347                if (ptlrpc_rep_need_swab(req)) {
2348                        if (KEY_IS(KEY_FID2PATH))
2349                                lustre_swab_fid2path(val);
2350                }
2351        }
2352        ptlrpc_req_finished(req);
2353
2354        return rc;
2355}
2356
2357static void lustre_swab_hai(struct hsm_action_item *h)
2358{
2359        __swab32s(&h->hai_len);
2360        __swab32s(&h->hai_action);
2361        lustre_swab_lu_fid(&h->hai_fid);
2362        lustre_swab_lu_fid(&h->hai_dfid);
2363        __swab64s(&h->hai_cookie);
2364        __swab64s(&h->hai_extent.offset);
2365        __swab64s(&h->hai_extent.length);
2366        __swab64s(&h->hai_gid);
2367}
2368
2369static void lustre_swab_hal(struct hsm_action_list *h)
2370{
2371        struct hsm_action_item  *hai;
2372        u32 i;
2373
2374        __swab32s(&h->hal_version);
2375        __swab32s(&h->hal_count);
2376        __swab32s(&h->hal_archive_id);
2377        __swab64s(&h->hal_flags);
2378        hai = hai_first(h);
2379        for (i = 0; i < h->hal_count; i++, hai = hai_next(hai))
2380                lustre_swab_hai(hai);
2381}
2382
2383static void lustre_swab_kuch(struct kuc_hdr *l)
2384{
2385        __swab16s(&l->kuc_magic);
2386        /* __u8 l->kuc_transport */
2387        __swab16s(&l->kuc_msgtype);
2388        __swab16s(&l->kuc_msglen);
2389}
2390
2391static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
2392                                struct lustre_kernelcomm *lk)
2393{
2394        struct obd_import  *imp = class_exp2cliimp(exp);
2395        __u32               archive = lk->lk_data;
2396        int                 rc = 0;
2397
2398        if (lk->lk_group != KUC_GRP_HSM) {
2399                CERROR("Bad copytool group %d\n", lk->lk_group);
2400                return -EINVAL;
2401        }
2402
2403        CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
2404               lk->lk_uid, lk->lk_group, lk->lk_flags);
2405
2406        if (lk->lk_flags & LK_FLG_STOP) {
2407                /* Unregister with the coordinator */
2408                rc = mdc_ioc_hsm_ct_unregister(imp);
2409        } else {
2410                rc = mdc_ioc_hsm_ct_register(imp, archive);
2411        }
2412
2413        return rc;
2414}
2415
2416/**
2417 * Send a message to any listening copytools
2418 * @param val KUC message (kuc_hdr + hsm_action_list)
2419 * @param len total length of message
2420 */
2421static int mdc_hsm_copytool_send(size_t len, void *val)
2422{
2423        struct kuc_hdr          *lh = (struct kuc_hdr *)val;
2424        struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
2425
2426        if (len < sizeof(*lh) + sizeof(*hal)) {
2427                CERROR("Short HSM message %zu < %zu\n", len,
2428                       sizeof(*lh) + sizeof(*hal));
2429                return -EPROTO;
2430        }
2431        if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
2432                lustre_swab_kuch(lh);
2433                lustre_swab_hal(hal);
2434        } else if (lh->kuc_magic != KUC_MAGIC) {
2435                CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
2436                return -EPROTO;
2437        }
2438
2439        CDEBUG(D_HSM,
2440               "Received message mg=%x t=%d m=%d l=%d actions=%d on %s\n",
2441               lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2442               lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2443
2444        /* Broadcast to HSM listeners */
2445        return libcfs_kkuc_group_put(KUC_GRP_HSM, lh);
2446}
2447
2448/**
2449 * callback function passed to kuc for re-registering each HSM copytool
2450 * running on MDC, after MDT shutdown/recovery.
2451 * @param data copytool registration data
2452 * @param cb_arg callback argument (obd_import)
2453 */
2454static int mdc_hsm_ct_reregister(void *data, void *cb_arg)
2455{
2456        struct kkuc_ct_data     *kcd = data;
2457        struct obd_import       *imp = (struct obd_import *)cb_arg;
2458        int                      rc;
2459
2460        if (!kcd || kcd->kcd_magic != KKUC_CT_DATA_MAGIC)
2461                return -EPROTO;
2462
2463        if (!obd_uuid_equals(&kcd->kcd_uuid, &imp->imp_obd->obd_uuid))
2464                return 0;
2465
2466        CDEBUG(D_HA, "%s: recover copytool registration to MDT (archive=%#x)\n",
2467               imp->imp_obd->obd_name, kcd->kcd_archive);
2468        rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_archive);
2469
2470        /* ignore error if the copytool is already registered */
2471        return (rc == -EEXIST) ? 0 : rc;
2472}
2473
2474static int mdc_set_info_async(const struct lu_env *env,
2475                              struct obd_export *exp,
2476                              u32 keylen, void *key,
2477                              u32 vallen, void *val,
2478                              struct ptlrpc_request_set *set)
2479{
2480        struct obd_import       *imp = class_exp2cliimp(exp);
2481        int                      rc;
2482
2483        if (KEY_IS(KEY_READ_ONLY)) {
2484                if (vallen != sizeof(int))
2485                        return -EINVAL;
2486
2487                spin_lock(&imp->imp_lock);
2488                if (*((int *)val)) {
2489                        imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2490                        imp->imp_connect_data.ocd_connect_flags |=
2491                                                        OBD_CONNECT_RDONLY;
2492                } else {
2493                        imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2494                        imp->imp_connect_data.ocd_connect_flags &=
2495                                                        ~OBD_CONNECT_RDONLY;
2496                }
2497                spin_unlock(&imp->imp_lock);
2498
2499                return do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2500                                         keylen, key, vallen, val, set);
2501        }
2502        if (KEY_IS(KEY_SPTLRPC_CONF)) {
2503                sptlrpc_conf_client_adapt(exp->exp_obd);
2504                return 0;
2505        }
2506        if (KEY_IS(KEY_FLUSH_CTX)) {
2507                sptlrpc_import_flush_my_ctx(imp);
2508                return 0;
2509        }
2510        if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2511                rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2512                                       keylen, key, vallen, val, set);
2513                return rc;
2514        }
2515        if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2516                rc = mdc_hsm_copytool_send(vallen, val);
2517                return rc;
2518        }
2519        if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2520                u32 *default_easize = val;
2521
2522                exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize;
2523                return 0;
2524        }
2525
2526        CERROR("Unknown key %s\n", (char *)key);
2527        return -EINVAL;
2528}
2529
2530static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2531                        __u32 keylen, void *key, __u32 *vallen, void *val,
2532                        struct lov_stripe_md *lsm)
2533{
2534        int rc = -EINVAL;
2535
2536        if (KEY_IS(KEY_MAX_EASIZE)) {
2537                u32 mdsize, *max_easize;
2538
2539                if (*vallen != sizeof(int))
2540                        return -EINVAL;
2541                mdsize = *(u32 *)val;
2542                if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2543                        exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2544                max_easize = val;
2545                *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2546                return 0;
2547        } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2548                u32 *default_easize;
2549
2550                if (*vallen != sizeof(int))
2551                        return -EINVAL;
2552                default_easize = val;
2553                *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
2554                return 0;
2555        } else if (KEY_IS(KEY_CONN_DATA)) {
2556                struct obd_import *imp = class_exp2cliimp(exp);
2557                struct obd_connect_data *data = val;
2558
2559                if (*vallen != sizeof(*data))
2560                        return -EINVAL;
2561
2562                *data = imp->imp_connect_data;
2563                return 0;
2564        } else if (KEY_IS(KEY_TGT_COUNT)) {
2565                *((u32 *)val) = 1;
2566                return 0;
2567        }
2568
2569        rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2570
2571        return rc;
2572}
2573
2574static int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
2575                    struct ptlrpc_request **request)
2576{
2577        struct ptlrpc_request *req;
2578        int                 rc;
2579
2580        *request = NULL;
2581        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2582        if (!req)
2583                return -ENOMEM;
2584
2585        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2586        if (rc) {
2587                ptlrpc_request_free(req);
2588                return rc;
2589        }
2590
2591        mdc_pack_body(req, fid, 0, 0, -1, 0);
2592
2593        ptlrpc_request_set_replen(req);
2594
2595        rc = ptlrpc_queue_wait(req);
2596        if (rc)
2597                ptlrpc_req_finished(req);
2598        else
2599                *request = req;
2600        return rc;
2601}
2602
2603static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2604                            enum obd_import_event event)
2605{
2606        int rc = 0;
2607
2608        LASSERT(imp->imp_obd == obd);
2609
2610        switch (event) {
2611        case IMP_EVENT_DISCON: {
2612#if 0
2613                /* XXX Pass event up to OBDs stack. used only for FLD now */
2614                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
2615#endif
2616                break;
2617        }
2618        case IMP_EVENT_INACTIVE: {
2619                struct client_obd *cli = &obd->u.cli;
2620                /*
2621                 * Flush current sequence to make client obtain new one
2622                 * from server in case of disconnect/reconnect.
2623                 */
2624                if (cli->cl_seq)
2625                        seq_client_flush(cli->cl_seq);
2626
2627                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2628                break;
2629        }
2630        case IMP_EVENT_INVALIDATE: {
2631                struct ldlm_namespace *ns = obd->obd_namespace;
2632
2633                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2634
2635                break;
2636        }
2637        case IMP_EVENT_ACTIVE:
2638                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2639                /* redo the kuc registration after reconnecting */
2640                if (rc == 0)
2641                        /* re-register HSM agents */
2642                        rc = libcfs_kkuc_group_foreach(KUC_GRP_HSM,
2643                                                       mdc_hsm_ct_reregister,
2644                                                       (void *)imp);
2645                break;
2646        case IMP_EVENT_OCD:
2647                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2648                break;
2649        case IMP_EVENT_DEACTIVATE:
2650        case IMP_EVENT_ACTIVATE:
2651                break;
2652        default:
2653                CERROR("Unknown import event %x\n", event);
2654                LBUG();
2655        }
2656        return rc;
2657}
2658
2659int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
2660                  struct lu_fid *fid, struct md_op_data *op_data)
2661{
2662        struct client_obd *cli = &exp->exp_obd->u.cli;
2663        struct lu_client_seq *seq = cli->cl_seq;
2664
2665        return seq_client_alloc_fid(env, seq, fid);
2666}
2667
2668static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
2669{
2670        struct client_obd *cli = &exp->exp_obd->u.cli;
2671
2672        return &cli->cl_target_uuid;
2673}
2674
2675/**
2676 * Determine whether the lock can be canceled before replaying it during
2677 * recovery, non zero value will be return if the lock can be canceled,
2678 * or zero returned for not
2679 */
2680static int mdc_cancel_weight(struct ldlm_lock *lock)
2681{
2682        if (lock->l_resource->lr_type != LDLM_IBITS)
2683                return 0;
2684
2685        /* FIXME: if we ever get into a situation where there are too many
2686         * opened files with open locks on a single node, then we really
2687         * should replay these open locks to reget it
2688         */
2689        if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2690                return 0;
2691
2692        return 1;
2693}
2694
2695static int mdc_resource_inode_free(struct ldlm_resource *res)
2696{
2697        if (res->lr_lvb_inode)
2698                res->lr_lvb_inode = NULL;
2699
2700        return 0;
2701}
2702
2703static struct ldlm_valblock_ops inode_lvbo = {
2704        .lvbo_free = mdc_resource_inode_free,
2705};
2706
2707static int mdc_llog_init(struct obd_device *obd)
2708{
2709        struct obd_llog_group   *olg = &obd->obd_olg;
2710        struct llog_ctxt        *ctxt;
2711        int                      rc;
2712
2713        rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd,
2714                        &llog_client_ops);
2715        if (rc)
2716                return rc;
2717
2718        ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2719        llog_initiator_connect(ctxt);
2720        llog_ctxt_put(ctxt);
2721
2722        return 0;
2723}
2724
2725static void mdc_llog_finish(struct obd_device *obd)
2726{
2727        struct llog_ctxt *ctxt;
2728
2729        ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2730        if (ctxt)
2731                llog_cleanup(NULL, ctxt);
2732}
2733
2734static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2735{
2736        struct client_obd *cli = &obd->u.cli;
2737        struct lprocfs_static_vars lvars = { NULL };
2738        int rc;
2739
2740        cli->cl_rpc_lock = kzalloc(sizeof(*cli->cl_rpc_lock), GFP_NOFS);
2741        if (!cli->cl_rpc_lock)
2742                return -ENOMEM;
2743        mdc_init_rpc_lock(cli->cl_rpc_lock);
2744
2745        rc = ptlrpcd_addref();
2746        if (rc < 0)
2747                goto err_rpc_lock;
2748
2749        cli->cl_close_lock = kzalloc(sizeof(*cli->cl_close_lock), GFP_NOFS);
2750        if (!cli->cl_close_lock) {
2751                rc = -ENOMEM;
2752                goto err_ptlrpcd_decref;
2753        }
2754        mdc_init_rpc_lock(cli->cl_close_lock);
2755
2756        rc = client_obd_setup(obd, cfg);
2757        if (rc)
2758                goto err_close_lock;
2759        lprocfs_mdc_init_vars(&lvars);
2760        lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
2761        sptlrpc_lprocfs_cliobd_attach(obd);
2762        ptlrpc_lprocfs_register_obd(obd);
2763
2764        ns_register_cancel(obd->obd_namespace, mdc_cancel_weight);
2765
2766        obd->obd_namespace->ns_lvbo = &inode_lvbo;
2767
2768        rc = mdc_llog_init(obd);
2769        if (rc) {
2770                mdc_cleanup(obd);
2771                CERROR("failed to setup llogging subsystems\n");
2772        }
2773
2774        return rc;
2775
2776err_close_lock:
2777        kfree(cli->cl_close_lock);
2778err_ptlrpcd_decref:
2779        ptlrpcd_decref();
2780err_rpc_lock:
2781        kfree(cli->cl_rpc_lock);
2782        return rc;
2783}
2784
2785/* Initialize the default and maximum LOV EA and cookie sizes.  This allows
2786 * us to make MDS RPCs with large enough reply buffers to hold a default
2787 * sized EA and cookie without having to calculate this (via a call into the
2788 * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
2789 * but not used to avoid wastefully vmalloc()'ing large reply buffers when
2790 * a large number of stripes is possible.  If a larger reply buffer is
2791 * required it will be reallocated in the ptlrpc layer due to overflow.
2792 */
2793static int mdc_init_ea_size(struct obd_export *exp, u32 easize, u32 def_easize,
2794                            u32 cookiesize, u32 def_cookiesize)
2795{
2796        struct obd_device *obd = exp->exp_obd;
2797        struct client_obd *cli = &obd->u.cli;
2798
2799        if (cli->cl_max_mds_easize < easize)
2800                cli->cl_max_mds_easize = easize;
2801
2802        if (cli->cl_default_mds_easize < def_easize)
2803                cli->cl_default_mds_easize = def_easize;
2804
2805        if (cli->cl_max_mds_cookiesize < cookiesize)
2806                cli->cl_max_mds_cookiesize = cookiesize;
2807
2808        if (cli->cl_default_mds_cookiesize < def_cookiesize)
2809                cli->cl_default_mds_cookiesize = def_cookiesize;
2810
2811        return 0;
2812}
2813
2814static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2815{
2816        switch (stage) {
2817        case OBD_CLEANUP_EARLY:
2818                break;
2819        case OBD_CLEANUP_EXPORTS:
2820                /* Failsafe, ok if racy */
2821                if (obd->obd_type->typ_refcnt <= 1)
2822                        libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
2823
2824                obd_cleanup_client_import(obd);
2825                ptlrpc_lprocfs_unregister_obd(obd);
2826                lprocfs_obd_cleanup(obd);
2827
2828                mdc_llog_finish(obd);
2829                break;
2830        }
2831        return 0;
2832}
2833
2834static int mdc_cleanup(struct obd_device *obd)
2835{
2836        struct client_obd *cli = &obd->u.cli;
2837
2838        kfree(cli->cl_rpc_lock);
2839        kfree(cli->cl_close_lock);
2840
2841        ptlrpcd_decref();
2842
2843        return client_obd_cleanup(obd);
2844}
2845
2846static int mdc_process_config(struct obd_device *obd, u32 len, void *buf)
2847{
2848        struct lustre_cfg *lcfg = buf;
2849        struct lprocfs_static_vars lvars = { NULL };
2850        int rc = 0;
2851
2852        lprocfs_mdc_init_vars(&lvars);
2853        switch (lcfg->lcfg_command) {
2854        default:
2855                rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
2856                                              lcfg, obd);
2857                if (rc > 0)
2858                        rc = 0;
2859                break;
2860        }
2861        return rc;
2862}
2863
2864static struct obd_ops mdc_obd_ops = {
2865        .owner          = THIS_MODULE,
2866        .setup          = mdc_setup,
2867        .precleanup     = mdc_precleanup,
2868        .cleanup        = mdc_cleanup,
2869        .add_conn       = client_import_add_conn,
2870        .del_conn       = client_import_del_conn,
2871        .connect        = client_connect_import,
2872        .disconnect     = client_disconnect_export,
2873        .iocontrol      = mdc_iocontrol,
2874        .set_info_async = mdc_set_info_async,
2875        .statfs         = mdc_statfs,
2876        .fid_init       = client_fid_init,
2877        .fid_fini       = client_fid_fini,
2878        .fid_alloc      = mdc_fid_alloc,
2879        .import_event   = mdc_import_event,
2880        .get_info       = mdc_get_info,
2881        .process_config = mdc_process_config,
2882        .get_uuid       = mdc_get_uuid,
2883        .quotactl       = mdc_quotactl,
2884        .quotacheck     = mdc_quotacheck
2885};
2886
2887static struct md_ops mdc_md_ops = {
2888        .getstatus              = mdc_getstatus,
2889        .null_inode             = mdc_null_inode,
2890        .close                  = mdc_close,
2891        .create                 = mdc_create,
2892        .done_writing           = mdc_done_writing,
2893        .enqueue                = mdc_enqueue,
2894        .getattr                = mdc_getattr,
2895        .getattr_name           = mdc_getattr_name,
2896        .intent_lock            = mdc_intent_lock,
2897        .link                   = mdc_link,
2898        .rename                 = mdc_rename,
2899        .setattr                = mdc_setattr,
2900        .setxattr               = mdc_setxattr,
2901        .getxattr               = mdc_getxattr,
2902        .sync                   = mdc_sync,
2903        .read_page              = mdc_read_page,
2904        .unlink                 = mdc_unlink,
2905        .cancel_unused          = mdc_cancel_unused,
2906        .init_ea_size           = mdc_init_ea_size,
2907        .set_lock_data          = mdc_set_lock_data,
2908        .lock_match             = mdc_lock_match,
2909        .get_lustre_md          = mdc_get_lustre_md,
2910        .free_lustre_md         = mdc_free_lustre_md,
2911        .set_open_replay_data   = mdc_set_open_replay_data,
2912        .clear_open_replay_data = mdc_clear_open_replay_data,
2913        .intent_getattr_async   = mdc_intent_getattr_async,
2914        .revalidate_lock        = mdc_revalidate_lock
2915};
2916
2917static int __init mdc_init(void)
2918{
2919        struct lprocfs_static_vars lvars = { NULL };
2920
2921        lprocfs_mdc_init_vars(&lvars);
2922
2923        return class_register_type(&mdc_obd_ops, &mdc_md_ops,
2924                                 LUSTRE_MDC_NAME, NULL);
2925}
2926
2927static void /*__exit*/ mdc_exit(void)
2928{
2929        class_unregister_type(LUSTRE_MDC_NAME);
2930}
2931
2932MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2933MODULE_DESCRIPTION("Lustre Metadata Client");
2934MODULE_VERSION(LUSTRE_VERSION_STRING);
2935MODULE_LICENSE("GPL");
2936
2937module_init(mdc_init);
2938module_exit(mdc_exit);
2939