linux/drivers/staging/lustre/lustre/lmv/lmv_obd.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2011, 2015, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 */
  33
  34#define DEBUG_SUBSYSTEM S_LMV
  35#include <linux/slab.h>
  36#include <linux/module.h>
  37#include <linux/init.h>
  38#include <linux/pagemap.h>
  39#include <linux/mm.h>
  40#include <asm/div64.h>
  41#include <linux/seq_file.h>
  42#include <linux/namei.h>
  43#include <linux/uaccess.h>
  44
  45#include <obd_support.h>
  46#include <lustre_net.h>
  47#include <obd_class.h>
  48#include <lustre_lmv.h>
  49#include <lprocfs_status.h>
  50#include <cl_object.h>
  51#include <lustre_fid.h>
  52#include <uapi/linux/lustre/lustre_ioctl.h>
  53#include <lustre_kernelcomm.h>
  54#include "lmv_internal.h"
  55
  56static int lmv_check_connect(struct obd_device *obd);
  57
  58static void lmv_activate_target(struct lmv_obd *lmv,
  59                                struct lmv_tgt_desc *tgt,
  60                                int activate)
  61{
  62        if (tgt->ltd_active == activate)
  63                return;
  64
  65        tgt->ltd_active = activate;
  66        lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
  67        tgt->ltd_exp->exp_obd->obd_inactive = !activate;
  68}
  69
  70/**
  71 * Error codes:
  72 *
  73 *  -EINVAL  : UUID can't be found in the LMV's target list
  74 *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
  75 *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
  76 */
  77static int lmv_set_mdc_active(struct lmv_obd *lmv, const struct obd_uuid *uuid,
  78                              int activate)
  79{
  80        struct lmv_tgt_desc *tgt = NULL;
  81        struct obd_device      *obd;
  82        u32                  i;
  83        int                  rc = 0;
  84
  85        CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
  86               lmv, uuid->uuid, activate);
  87
  88        spin_lock(&lmv->lmv_lock);
  89        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
  90                tgt = lmv->tgts[i];
  91                if (!tgt || !tgt->ltd_exp)
  92                        continue;
  93
  94                CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
  95                       tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
  96
  97                if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
  98                        break;
  99        }
 100
 101        if (i == lmv->desc.ld_tgt_count) {
 102                rc = -EINVAL;
 103                goto out_lmv_lock;
 104        }
 105
 106        obd = class_exp2obd(tgt->ltd_exp);
 107        if (!obd) {
 108                rc = -ENOTCONN;
 109                goto out_lmv_lock;
 110        }
 111
 112        CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
 113               obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
 114               obd->obd_type->typ_name, i);
 115        LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
 116
 117        if (tgt->ltd_active == activate) {
 118                CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
 119                       activate ? "" : "in");
 120                goto out_lmv_lock;
 121        }
 122
 123        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
 124               activate ? "" : "in");
 125        lmv_activate_target(lmv, tgt, activate);
 126
 127 out_lmv_lock:
 128        spin_unlock(&lmv->lmv_lock);
 129        return rc;
 130}
 131
 132static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
 133{
 134        struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
 135        struct lmv_tgt_desc *tgt = lmv->tgts[0];
 136
 137        return tgt ? obd_get_uuid(tgt->ltd_exp) : NULL;
 138}
 139
 140static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
 141                      enum obd_notify_event ev, void *data)
 142{
 143        struct obd_connect_data *conn_data;
 144        struct lmv_obd    *lmv = &obd->u.lmv;
 145        struct obd_uuid  *uuid;
 146        int                   rc = 0;
 147
 148        if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
 149                CERROR("unexpected notification of %s %s!\n",
 150                       watched->obd_type->typ_name,
 151                       watched->obd_name);
 152                return -EINVAL;
 153        }
 154
 155        uuid = &watched->u.cli.cl_target_uuid;
 156        if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
 157                /*
 158                 * Set MDC as active before notifying the observer, so the
 159                 * observer can use the MDC normally.
 160                 */
 161                rc = lmv_set_mdc_active(lmv, uuid,
 162                                        ev == OBD_NOTIFY_ACTIVE);
 163                if (rc) {
 164                        CERROR("%sactivation of %s failed: %d\n",
 165                               ev == OBD_NOTIFY_ACTIVE ? "" : "de",
 166                               uuid->uuid, rc);
 167                        return rc;
 168                }
 169        } else if (ev == OBD_NOTIFY_OCD) {
 170                conn_data = &watched->u.cli.cl_import->imp_connect_data;
 171                /*
 172                 * XXX: Make sure that ocd_connect_flags from all targets are
 173                 * the same. Otherwise one of MDTs runs wrong version or
 174                 * something like this.  --umka
 175                 */
 176                obd->obd_self_export->exp_connect_data = *conn_data;
 177        }
 178
 179        /*
 180         * Pass the notification up the chain.
 181         */
 182        if (obd->obd_observer)
 183                rc = obd_notify(obd->obd_observer, watched, ev, data);
 184
 185        return rc;
 186}
 187
 188static int lmv_connect(const struct lu_env *env,
 189                       struct obd_export **pexp, struct obd_device *obd,
 190                       struct obd_uuid *cluuid, struct obd_connect_data *data,
 191                       void *localdata)
 192{
 193        struct lmv_obd  *lmv = &obd->u.lmv;
 194        struct lustre_handle  conn = { 0 };
 195        struct obd_export *exp;
 196        int                 rc = 0;
 197
 198        rc = class_connect(&conn, obd, cluuid);
 199        if (rc) {
 200                CERROR("class_connection() returned %d\n", rc);
 201                return rc;
 202        }
 203
 204        exp = class_conn2export(&conn);
 205
 206        lmv->connected = 0;
 207        lmv->cluuid = *cluuid;
 208        lmv->conn_data = *data;
 209
 210        lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
 211                                                    &obd->obd_kobj);
 212        rc = lmv_check_connect(obd);
 213        if (rc)
 214                goto out_sysfs;
 215
 216        *pexp = exp;
 217
 218        return rc;
 219
 220out_sysfs:
 221        if (lmv->lmv_tgts_kobj)
 222                kobject_put(lmv->lmv_tgts_kobj);
 223
 224        class_disconnect(exp);
 225
 226        return rc;
 227}
 228
 229static int lmv_init_ea_size(struct obd_export *exp, u32 easize, u32 def_easize)
 230{
 231        struct obd_device   *obd = exp->exp_obd;
 232        struct lmv_obd      *lmv = &obd->u.lmv;
 233        u32 i;
 234        int               rc = 0;
 235        int               change = 0;
 236
 237        if (lmv->max_easize < easize) {
 238                lmv->max_easize = easize;
 239                change = 1;
 240        }
 241        if (lmv->max_def_easize < def_easize) {
 242                lmv->max_def_easize = def_easize;
 243                change = 1;
 244        }
 245
 246        if (change == 0)
 247                return 0;
 248
 249        if (lmv->connected == 0)
 250                return 0;
 251
 252        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 253                struct lmv_tgt_desc *tgt = lmv->tgts[i];
 254
 255                if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) {
 256                        CWARN("%s: NULL export for %d\n", obd->obd_name, i);
 257                        continue;
 258                }
 259
 260                rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
 261                if (rc) {
 262                        CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d\n",
 263                               obd->obd_name, i, rc);
 264                        break;
 265                }
 266        }
 267        return rc;
 268}
 269
 270#define MAX_STRING_SIZE 128
 271
 272static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 273{
 274        struct lmv_obd    *lmv = &obd->u.lmv;
 275        struct obd_uuid  *cluuid = &lmv->cluuid;
 276        struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
 277        struct obd_device       *mdc_obd;
 278        struct obd_export       *mdc_exp;
 279        struct lu_fld_target     target;
 280        int                   rc;
 281
 282        mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
 283                                        &obd->obd_uuid);
 284        if (!mdc_obd) {
 285                CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
 286                return -EINVAL;
 287        }
 288
 289        CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
 290               mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
 291               tgt->ltd_uuid.uuid, obd->obd_uuid.uuid, cluuid->uuid);
 292
 293        if (!mdc_obd->obd_set_up) {
 294                CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
 295                return -EINVAL;
 296        }
 297
 298        rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
 299                         &lmv->conn_data, NULL);
 300        if (rc) {
 301                CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
 302                return rc;
 303        }
 304
 305        /*
 306         * Init fid sequence client for this mdc and add new fld target.
 307         */
 308        rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
 309        if (rc)
 310                return rc;
 311
 312        target.ft_srv = NULL;
 313        target.ft_exp = mdc_exp;
 314        target.ft_idx = tgt->ltd_idx;
 315
 316        fld_client_add_target(&lmv->lmv_fld, &target);
 317
 318        rc = obd_register_observer(mdc_obd, obd);
 319        if (rc) {
 320                obd_disconnect(mdc_exp);
 321                CERROR("target %s register_observer error %d\n",
 322                       tgt->ltd_uuid.uuid, rc);
 323                return rc;
 324        }
 325
 326        if (obd->obd_observer) {
 327                /*
 328                 * Tell the observer about the new target.
 329                 */
 330                rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
 331                                OBD_NOTIFY_ACTIVE,
 332                                (void *)(tgt - lmv->tgts[0]));
 333                if (rc) {
 334                        obd_disconnect(mdc_exp);
 335                        return rc;
 336                }
 337        }
 338
 339        tgt->ltd_active = 1;
 340        tgt->ltd_exp = mdc_exp;
 341        lmv->desc.ld_active_tgt_count++;
 342
 343        md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
 344
 345        CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
 346               mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
 347               atomic_read(&obd->obd_refcount));
 348
 349        if (lmv->lmv_tgts_kobj)
 350                /* Even if we failed to create the link, that's fine */
 351                rc = sysfs_create_link(lmv->lmv_tgts_kobj, &mdc_obd->obd_kobj,
 352                                       mdc_obd->obd_name);
 353        return 0;
 354}
 355
 356static void lmv_del_target(struct lmv_obd *lmv, int index)
 357{
 358        if (!lmv->tgts[index])
 359                return;
 360
 361        kfree(lmv->tgts[index]);
 362        lmv->tgts[index] = NULL;
 363}
 364
 365static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 366                          __u32 index, int gen)
 367{
 368        struct lmv_obd      *lmv = &obd->u.lmv;
 369        struct obd_device *mdc_obd;
 370        struct lmv_tgt_desc *tgt;
 371        int orig_tgt_count = 0;
 372        int               rc = 0;
 373
 374        CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
 375
 376        mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
 377                                        &obd->obd_uuid);
 378        if (!mdc_obd) {
 379                CERROR("%s: Target %s not attached: rc = %d\n",
 380                       obd->obd_name, uuidp->uuid, -EINVAL);
 381                return -EINVAL;
 382        }
 383
 384        mutex_lock(&lmv->lmv_init_mutex);
 385
 386        if ((index < lmv->tgts_size) && lmv->tgts[index]) {
 387                tgt = lmv->tgts[index];
 388                CERROR("%s: UUID %s already assigned at LOV target index %d: rc = %d\n",
 389                       obd->obd_name,
 390                       obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
 391                mutex_unlock(&lmv->lmv_init_mutex);
 392                return -EEXIST;
 393        }
 394
 395        if (index >= lmv->tgts_size) {
 396                /* We need to reallocate the lmv target array. */
 397                struct lmv_tgt_desc **newtgts, **old = NULL;
 398                __u32 newsize = 1;
 399                __u32 oldsize = 0;
 400
 401                while (newsize < index + 1)
 402                        newsize <<= 1;
 403                newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
 404                if (!newtgts) {
 405                        mutex_unlock(&lmv->lmv_init_mutex);
 406                        return -ENOMEM;
 407                }
 408
 409                if (lmv->tgts_size) {
 410                        memcpy(newtgts, lmv->tgts,
 411                               sizeof(*newtgts) * lmv->tgts_size);
 412                        old = lmv->tgts;
 413                        oldsize = lmv->tgts_size;
 414                }
 415
 416                lmv->tgts = newtgts;
 417                lmv->tgts_size = newsize;
 418                smp_rmb();
 419                kfree(old);
 420
 421                CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
 422                       lmv->tgts_size);
 423        }
 424
 425        tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
 426        if (!tgt) {
 427                mutex_unlock(&lmv->lmv_init_mutex);
 428                return -ENOMEM;
 429        }
 430
 431        mutex_init(&tgt->ltd_fid_mutex);
 432        tgt->ltd_idx = index;
 433        tgt->ltd_uuid = *uuidp;
 434        tgt->ltd_active = 0;
 435        lmv->tgts[index] = tgt;
 436        if (index >= lmv->desc.ld_tgt_count) {
 437                orig_tgt_count = lmv->desc.ld_tgt_count;
 438                lmv->desc.ld_tgt_count = index + 1;
 439        }
 440
 441        if (!lmv->connected) {
 442                /* lmv_check_connect() will connect this target. */
 443                mutex_unlock(&lmv->lmv_init_mutex);
 444                return rc;
 445        }
 446
 447        /* Otherwise let's connect it ourselves */
 448        mutex_unlock(&lmv->lmv_init_mutex);
 449        rc = lmv_connect_mdc(obd, tgt);
 450        if (rc) {
 451                spin_lock(&lmv->lmv_lock);
 452                if (lmv->desc.ld_tgt_count == index + 1)
 453                        lmv->desc.ld_tgt_count = orig_tgt_count;
 454                memset(tgt, 0, sizeof(*tgt));
 455                spin_unlock(&lmv->lmv_lock);
 456        } else {
 457                int easize = sizeof(struct lmv_stripe_md) +
 458                             lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
 459                lmv_init_ea_size(obd->obd_self_export, easize, 0);
 460        }
 461
 462        return rc;
 463}
 464
 465static int lmv_check_connect(struct obd_device *obd)
 466{
 467        struct lmv_obd       *lmv = &obd->u.lmv;
 468        struct lmv_tgt_desc  *tgt;
 469        u32 i;
 470        int                rc;
 471        int                easize;
 472
 473        if (lmv->connected)
 474                return 0;
 475
 476        mutex_lock(&lmv->lmv_init_mutex);
 477        if (lmv->connected) {
 478                mutex_unlock(&lmv->lmv_init_mutex);
 479                return 0;
 480        }
 481
 482        if (lmv->desc.ld_tgt_count == 0) {
 483                mutex_unlock(&lmv->lmv_init_mutex);
 484                CERROR("%s: no targets configured.\n", obd->obd_name);
 485                return -EINVAL;
 486        }
 487
 488        LASSERT(lmv->tgts);
 489
 490        if (!lmv->tgts[0]) {
 491                mutex_unlock(&lmv->lmv_init_mutex);
 492                CERROR("%s: no target configured for index 0.\n",
 493                       obd->obd_name);
 494                return -EINVAL;
 495        }
 496
 497        CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
 498               lmv->cluuid.uuid, obd->obd_name);
 499
 500        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 501                tgt = lmv->tgts[i];
 502                if (!tgt)
 503                        continue;
 504                rc = lmv_connect_mdc(obd, tgt);
 505                if (rc)
 506                        goto out_disc;
 507        }
 508
 509        lmv->connected = 1;
 510        easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
 511        lmv_init_ea_size(obd->obd_self_export, easize, 0);
 512        mutex_unlock(&lmv->lmv_init_mutex);
 513        return 0;
 514
 515 out_disc:
 516        while (i-- > 0) {
 517                int rc2;
 518
 519                tgt = lmv->tgts[i];
 520                if (!tgt)
 521                        continue;
 522                tgt->ltd_active = 0;
 523                if (tgt->ltd_exp) {
 524                        --lmv->desc.ld_active_tgt_count;
 525                        rc2 = obd_disconnect(tgt->ltd_exp);
 526                        if (rc2) {
 527                                CERROR("LMV target %s disconnect on MDC idx %d: error %d\n",
 528                                       tgt->ltd_uuid.uuid, i, rc2);
 529                        }
 530                }
 531        }
 532
 533        mutex_unlock(&lmv->lmv_init_mutex);
 534        return rc;
 535}
 536
 537static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 538{
 539        struct lmv_obd   *lmv = &obd->u.lmv;
 540        struct obd_device      *mdc_obd;
 541        int                  rc;
 542
 543        mdc_obd = class_exp2obd(tgt->ltd_exp);
 544
 545        if (mdc_obd) {
 546                mdc_obd->obd_force = obd->obd_force;
 547                mdc_obd->obd_fail = obd->obd_fail;
 548                mdc_obd->obd_no_recov = obd->obd_no_recov;
 549
 550                if (lmv->lmv_tgts_kobj)
 551                        sysfs_remove_link(lmv->lmv_tgts_kobj,
 552                                          mdc_obd->obd_name);
 553        }
 554
 555        rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
 556        if (rc)
 557                CERROR("Can't finalize fids factory\n");
 558
 559        CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
 560               tgt->ltd_exp->exp_obd->obd_name,
 561               tgt->ltd_exp->exp_obd->obd_uuid.uuid);
 562
 563        obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
 564        rc = obd_disconnect(tgt->ltd_exp);
 565        if (rc) {
 566                if (tgt->ltd_active) {
 567                        CERROR("Target %s disconnect error %d\n",
 568                               tgt->ltd_uuid.uuid, rc);
 569                }
 570        }
 571
 572        lmv_activate_target(lmv, tgt, 0);
 573        tgt->ltd_exp = NULL;
 574        return 0;
 575}
 576
 577static int lmv_disconnect(struct obd_export *exp)
 578{
 579        struct obd_device     *obd = class_exp2obd(exp);
 580        struct lmv_obd  *lmv = &obd->u.lmv;
 581        int                 rc;
 582        u32 i;
 583
 584        if (!lmv->tgts)
 585                goto out_local;
 586
 587        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 588                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
 589                        continue;
 590
 591                lmv_disconnect_mdc(obd, lmv->tgts[i]);
 592        }
 593
 594        if (lmv->lmv_tgts_kobj)
 595                kobject_put(lmv->lmv_tgts_kobj);
 596
 597out_local:
 598        /*
 599         * This is the case when no real connection is established by
 600         * lmv_check_connect().
 601         */
 602        if (!lmv->connected)
 603                class_export_put(exp);
 604        rc = class_disconnect(exp);
 605        lmv->connected = 0;
 606        return rc;
 607}
 608
 609static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
 610                        void __user *uarg)
 611{
 612        struct obd_device       *obddev = class_exp2obd(exp);
 613        struct lmv_obd          *lmv = &obddev->u.lmv;
 614        struct getinfo_fid2path *gf;
 615        struct lmv_tgt_desc     *tgt;
 616        struct getinfo_fid2path *remote_gf = NULL;
 617        int                     remote_gf_size = 0;
 618        int                     rc;
 619
 620        gf = karg;
 621        tgt = lmv_find_target(lmv, &gf->gf_fid);
 622        if (IS_ERR(tgt))
 623                return PTR_ERR(tgt);
 624
 625repeat_fid2path:
 626        rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
 627        if (rc != 0 && rc != -EREMOTE)
 628                goto out_fid2path;
 629
 630        /* If remote_gf != NULL, it means just building the
 631         * path on the remote MDT, copy this path segment to gf
 632         */
 633        if (remote_gf) {
 634                struct getinfo_fid2path *ori_gf;
 635                char *ptr;
 636
 637                ori_gf = karg;
 638                if (strlen(ori_gf->gf_path) + 1 +
 639                    strlen(gf->gf_path) + 1 > ori_gf->gf_pathlen) {
 640                        rc = -EOVERFLOW;
 641                        goto out_fid2path;
 642                }
 643
 644                ptr = ori_gf->gf_path;
 645
 646                memmove(ptr + strlen(gf->gf_path) + 1, ptr,
 647                        strlen(ori_gf->gf_path));
 648
 649                strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
 650                ptr += strlen(gf->gf_path);
 651                *ptr = '/';
 652        }
 653
 654        CDEBUG(D_INFO, "%s: get path %s " DFID " rec: %llu ln: %u\n",
 655               tgt->ltd_exp->exp_obd->obd_name,
 656               gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
 657               gf->gf_linkno);
 658
 659        if (rc == 0)
 660                goto out_fid2path;
 661
 662        /* sigh, has to go to another MDT to do path building further */
 663        if (!remote_gf) {
 664                remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
 665                remote_gf = kzalloc(remote_gf_size, GFP_NOFS);
 666                if (!remote_gf) {
 667                        rc = -ENOMEM;
 668                        goto out_fid2path;
 669                }
 670                remote_gf->gf_pathlen = PATH_MAX;
 671        }
 672
 673        if (!fid_is_sane(&gf->gf_fid)) {
 674                CERROR("%s: invalid FID " DFID ": rc = %d\n",
 675                       tgt->ltd_exp->exp_obd->obd_name,
 676                       PFID(&gf->gf_fid), -EINVAL);
 677                rc = -EINVAL;
 678                goto out_fid2path;
 679        }
 680
 681        tgt = lmv_find_target(lmv, &gf->gf_fid);
 682        if (IS_ERR(tgt)) {
 683                rc = -EINVAL;
 684                goto out_fid2path;
 685        }
 686
 687        remote_gf->gf_fid = gf->gf_fid;
 688        remote_gf->gf_recno = -1;
 689        remote_gf->gf_linkno = -1;
 690        memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
 691        gf = remote_gf;
 692        goto repeat_fid2path;
 693
 694out_fid2path:
 695        kfree(remote_gf);
 696        return rc;
 697}
 698
 699static int lmv_hsm_req_count(struct lmv_obd *lmv,
 700                             const struct hsm_user_request *hur,
 701                             const struct lmv_tgt_desc *tgt_mds)
 702{
 703        u32 i, nr = 0;
 704        struct lmv_tgt_desc    *curr_tgt;
 705
 706        /* count how many requests must be sent to the given target */
 707        for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
 708                curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
 709                if (IS_ERR(curr_tgt))
 710                        return PTR_ERR(curr_tgt);
 711                if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
 712                        nr++;
 713        }
 714        return nr;
 715}
 716
 717static int lmv_hsm_req_build(struct lmv_obd *lmv,
 718                             struct hsm_user_request *hur_in,
 719                             const struct lmv_tgt_desc *tgt_mds,
 720                             struct hsm_user_request *hur_out)
 721{
 722        int                     i, nr_out;
 723        struct lmv_tgt_desc    *curr_tgt;
 724
 725        /* build the hsm_user_request for the given target */
 726        hur_out->hur_request = hur_in->hur_request;
 727        nr_out = 0;
 728        for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
 729                curr_tgt = lmv_find_target(lmv,
 730                                           &hur_in->hur_user_item[i].hui_fid);
 731                if (IS_ERR(curr_tgt))
 732                        return PTR_ERR(curr_tgt);
 733                if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
 734                        hur_out->hur_user_item[nr_out] =
 735                                hur_in->hur_user_item[i];
 736                        nr_out++;
 737                }
 738        }
 739        hur_out->hur_request.hr_itemcount = nr_out;
 740        memcpy(hur_data(hur_out), hur_data(hur_in),
 741               hur_in->hur_request.hr_data_len);
 742
 743        return 0;
 744}
 745
 746static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
 747                                 struct lustre_kernelcomm *lk,
 748                                 void __user *uarg)
 749{
 750        __u32 i;
 751
 752        /* unregister request (call from llapi_hsm_copytool_fini) */
 753        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 754                struct lmv_tgt_desc *tgt = lmv->tgts[i];
 755
 756                if (!tgt || !tgt->ltd_exp)
 757                        continue;
 758
 759                /* best effort: try to clean as much as possible
 760                 * (continue on error)
 761                 */
 762                obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
 763        }
 764
 765        /* Whatever the result, remove copytool from kuc groups.
 766         * Unreached coordinators will get EPIPE on next requests
 767         * and will unregister automatically.
 768         */
 769        return libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
 770}
 771
 772static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
 773                               struct lustre_kernelcomm *lk, void __user *uarg)
 774{
 775        struct file *filp;
 776        __u32 i, j;
 777        int err, rc = 0;
 778        bool any_set = false;
 779        struct kkuc_ct_data kcd = { 0 };
 780
 781        /* All or nothing: try to register to all MDS.
 782         * In case of failure, unregister from previous MDS,
 783         * except if it because of inactive target.
 784         */
 785        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 786                struct lmv_tgt_desc *tgt = lmv->tgts[i];
 787
 788                if (!tgt || !tgt->ltd_exp)
 789                        continue;
 790
 791                err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
 792                if (err) {
 793                        if (tgt->ltd_active) {
 794                                /* permanent error */
 795                                CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
 796                                       tgt->ltd_uuid.uuid, i, cmd, err);
 797                                rc = err;
 798                                lk->lk_flags |= LK_FLG_STOP;
 799                                /* unregister from previous MDS */
 800                                for (j = 0; j < i; j++) {
 801                                        tgt = lmv->tgts[j];
 802
 803                                        if (!tgt || !tgt->ltd_exp)
 804                                                continue;
 805                                        obd_iocontrol(cmd, tgt->ltd_exp, len,
 806                                                      lk, uarg);
 807                                }
 808                                return rc;
 809                        }
 810                        /* else: transient error.
 811                         * kuc will register to the missing MDT when it is back
 812                         */
 813                } else {
 814                        any_set = true;
 815                }
 816        }
 817
 818        if (!any_set)
 819                /* no registration done: return error */
 820                return -ENOTCONN;
 821
 822        /* at least one registration done, with no failure */
 823        filp = fget(lk->lk_wfd);
 824        if (!filp)
 825                return -EBADF;
 826
 827        kcd.kcd_magic = KKUC_CT_DATA_MAGIC;
 828        kcd.kcd_uuid = lmv->cluuid;
 829        kcd.kcd_archive = lk->lk_data;
 830
 831        rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group,
 832                                   &kcd, sizeof(kcd));
 833        if (rc)
 834                fput(filp);
 835
 836        return rc;
 837}
 838
 839static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 840                         int len, void *karg, void __user *uarg)
 841{
 842        struct obd_device    *obddev = class_exp2obd(exp);
 843        struct lmv_obd       *lmv = &obddev->u.lmv;
 844        struct lmv_tgt_desc *tgt = NULL;
 845        u32 i = 0;
 846        int                rc = 0;
 847        int                set = 0;
 848        u32 count = lmv->desc.ld_tgt_count;
 849
 850        if (count == 0)
 851                return -ENOTTY;
 852
 853        switch (cmd) {
 854        case IOC_OBD_STATFS: {
 855                struct obd_ioctl_data *data = karg;
 856                struct obd_device *mdc_obd;
 857                struct obd_statfs stat_buf = {0};
 858                __u32 index;
 859
 860                memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
 861                if (index >= count)
 862                        return -ENODEV;
 863
 864                tgt = lmv->tgts[index];
 865                if (!tgt || !tgt->ltd_active)
 866                        return -ENODATA;
 867
 868                mdc_obd = class_exp2obd(tgt->ltd_exp);
 869                if (!mdc_obd)
 870                        return -EINVAL;
 871
 872                /* copy UUID */
 873                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
 874                                 min((int)data->ioc_plen2,
 875                                     (int)sizeof(struct obd_uuid))))
 876                        return -EFAULT;
 877
 878                rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
 879                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
 880                                0);
 881                if (rc)
 882                        return rc;
 883                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
 884                                 min((int)data->ioc_plen1,
 885                                     (int)sizeof(stat_buf))))
 886                        return -EFAULT;
 887                break;
 888        }
 889        case OBD_IOC_QUOTACTL: {
 890                struct if_quotactl *qctl = karg;
 891                struct obd_quotactl *oqctl;
 892
 893                if (qctl->qc_valid == QC_MDTIDX) {
 894                        if (count <= qctl->qc_idx)
 895                                return -EINVAL;
 896
 897                        tgt = lmv->tgts[qctl->qc_idx];
 898                        if (!tgt || !tgt->ltd_exp)
 899                                return -EINVAL;
 900                } else if (qctl->qc_valid == QC_UUID) {
 901                        for (i = 0; i < count; i++) {
 902                                tgt = lmv->tgts[i];
 903                                if (!tgt)
 904                                        continue;
 905                                if (!obd_uuid_equals(&tgt->ltd_uuid,
 906                                                     &qctl->obd_uuid))
 907                                        continue;
 908
 909                                if (!tgt->ltd_exp)
 910                                        return -EINVAL;
 911
 912                                break;
 913                        }
 914                } else {
 915                        return -EINVAL;
 916                }
 917
 918                if (i >= count)
 919                        return -EAGAIN;
 920
 921                LASSERT(tgt && tgt->ltd_exp);
 922                oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
 923                if (!oqctl)
 924                        return -ENOMEM;
 925
 926                QCTL_COPY(oqctl, qctl);
 927                rc = obd_quotactl(tgt->ltd_exp, oqctl);
 928                if (rc == 0) {
 929                        QCTL_COPY(qctl, oqctl);
 930                        qctl->qc_valid = QC_MDTIDX;
 931                        qctl->obd_uuid = tgt->ltd_uuid;
 932                }
 933                kfree(oqctl);
 934                break;
 935        }
 936        case OBD_IOC_CHANGELOG_SEND:
 937        case OBD_IOC_CHANGELOG_CLEAR: {
 938                struct ioc_changelog *icc = karg;
 939
 940                if (icc->icc_mdtindex >= count)
 941                        return -ENODEV;
 942
 943                tgt = lmv->tgts[icc->icc_mdtindex];
 944                if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
 945                        return -ENODEV;
 946                rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
 947                break;
 948        }
 949        case LL_IOC_GET_CONNECT_FLAGS: {
 950                tgt = lmv->tgts[0];
 951
 952                if (!tgt || !tgt->ltd_exp)
 953                        return -ENODATA;
 954                rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
 955                break;
 956        }
 957        case LL_IOC_FID2MDTIDX: {
 958                struct lu_fid *fid = karg;
 959                int mdt_index;
 960
 961                rc = lmv_fld_lookup(lmv, fid, &mdt_index);
 962                if (rc)
 963                        return rc;
 964
 965                /*
 966                 * Note: this is from llite(see ll_dir_ioctl()), @uarg does not
 967                 * point to user space memory for FID2MDTIDX.
 968                 */
 969                *(__u32 *)uarg = mdt_index;
 970                break;
 971        }
 972        case OBD_IOC_FID2PATH: {
 973                rc = lmv_fid2path(exp, len, karg, uarg);
 974                break;
 975        }
 976        case LL_IOC_HSM_STATE_GET:
 977        case LL_IOC_HSM_STATE_SET:
 978        case LL_IOC_HSM_ACTION: {
 979                struct md_op_data       *op_data = karg;
 980
 981                tgt = lmv_find_target(lmv, &op_data->op_fid1);
 982                if (IS_ERR(tgt))
 983                        return PTR_ERR(tgt);
 984
 985                if (!tgt->ltd_exp)
 986                        return -EINVAL;
 987
 988                rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
 989                break;
 990        }
 991        case LL_IOC_HSM_PROGRESS: {
 992                const struct hsm_progress_kernel *hpk = karg;
 993
 994                tgt = lmv_find_target(lmv, &hpk->hpk_fid);
 995                if (IS_ERR(tgt))
 996                        return PTR_ERR(tgt);
 997                rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
 998                break;
 999        }
1000        case LL_IOC_HSM_REQUEST: {
1001                struct hsm_user_request *hur = karg;
1002                unsigned int reqcount = hur->hur_request.hr_itemcount;
1003
1004                if (reqcount == 0)
1005                        return 0;
1006
1007                /* if the request is about a single fid
1008                 * or if there is a single MDS, no need to split
1009                 * the request.
1010                 */
1011                if (reqcount == 1 || count == 1) {
1012                        tgt = lmv_find_target(lmv,
1013                                              &hur->hur_user_item[0].hui_fid);
1014                        if (IS_ERR(tgt))
1015                                return PTR_ERR(tgt);
1016                        rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1017                } else {
1018                        /* split fid list to their respective MDS */
1019                        for (i = 0; i < count; i++) {
1020                                struct hsm_user_request *req;
1021                                size_t reqlen;
1022                                int nr, rc1;
1023
1024                                tgt = lmv->tgts[i];
1025                                if (!tgt || !tgt->ltd_exp)
1026                                        continue;
1027
1028                                nr = lmv_hsm_req_count(lmv, hur, tgt);
1029                                if (nr < 0)
1030                                        return nr;
1031                                if (nr == 0) /* nothing for this MDS */
1032                                        continue;
1033
1034                                /* build a request with fids for this MDS */
1035                                reqlen = offsetof(typeof(*hur),
1036                                                  hur_user_item[nr])
1037                                         + hur->hur_request.hr_data_len;
1038                                req = libcfs_kvzalloc(reqlen, GFP_NOFS);
1039                                if (!req)
1040                                        return -ENOMEM;
1041
1042                                rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1043                                if (rc1 < 0)
1044                                        goto hsm_req_err;
1045
1046                                rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1047                                                    req, uarg);
1048hsm_req_err:
1049                                if (rc1 != 0 && rc == 0)
1050                                        rc = rc1;
1051                                kvfree(req);
1052                        }
1053                }
1054                break;
1055        }
1056        case LL_IOC_LOV_SWAP_LAYOUTS: {
1057                struct md_op_data       *op_data = karg;
1058                struct lmv_tgt_desc     *tgt1, *tgt2;
1059
1060                tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1061                if (IS_ERR(tgt1))
1062                        return PTR_ERR(tgt1);
1063
1064                tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1065                if (IS_ERR(tgt2))
1066                        return PTR_ERR(tgt2);
1067
1068                if (!tgt1->ltd_exp || !tgt2->ltd_exp)
1069                        return -EINVAL;
1070
1071                /* only files on same MDT can have their layouts swapped */
1072                if (tgt1->ltd_idx != tgt2->ltd_idx)
1073                        return -EPERM;
1074
1075                rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1076                break;
1077        }
1078        case LL_IOC_HSM_CT_START: {
1079                struct lustre_kernelcomm *lk = karg;
1080
1081                if (lk->lk_flags & LK_FLG_STOP)
1082                        rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1083                else
1084                        rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1085                break;
1086        }
1087        default:
1088                for (i = 0; i < count; i++) {
1089                        struct obd_device *mdc_obd;
1090                        int err;
1091
1092                        tgt = lmv->tgts[i];
1093                        if (!tgt || !tgt->ltd_exp)
1094                                continue;
1095                        /* ll_umount_begin() sets force flag but for lmv, not
1096                         * mdc. Let's pass it through
1097                         */
1098                        mdc_obd = class_exp2obd(tgt->ltd_exp);
1099                        mdc_obd->obd_force = obddev->obd_force;
1100                        err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1101                        if (err) {
1102                                if (tgt->ltd_active) {
1103                                        CERROR("%s: error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
1104                                               lmv2obd_dev(lmv)->obd_name,
1105                                               tgt->ltd_uuid.uuid, i, cmd, err);
1106                                        if (!rc)
1107                                                rc = err;
1108                                }
1109                        } else {
1110                                set = 1;
1111                        }
1112                }
1113                if (!set && !rc)
1114                        rc = -EIO;
1115        }
1116        return rc;
1117}
1118
1119/**
1120 * This is _inode_ placement policy function (not name).
1121 */
1122static int lmv_placement_policy(struct obd_device *obd,
1123                                struct md_op_data *op_data, u32 *mds)
1124{
1125        struct lmv_obd    *lmv = &obd->u.lmv;
1126
1127        LASSERT(mds);
1128
1129        if (lmv->desc.ld_tgt_count == 1) {
1130                *mds = 0;
1131                return 0;
1132        }
1133
1134        if (op_data->op_default_stripe_offset != -1) {
1135                *mds = op_data->op_default_stripe_offset;
1136                return 0;
1137        }
1138
1139        /**
1140         * If stripe_offset is provided during setdirstripe
1141         * (setdirstripe -i xx), xx MDS will be chosen.
1142         */
1143        if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data) {
1144                struct lmv_user_md *lum;
1145
1146                lum = op_data->op_data;
1147                if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) {
1148                        *mds = le32_to_cpu(lum->lum_stripe_offset);
1149                } else {
1150                        /*
1151                         * -1 means default, which will be in the same MDT with
1152                         * the stripe
1153                         */
1154                        *mds = op_data->op_mds;
1155                        lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds);
1156                }
1157        } else {
1158                /*
1159                 * Allocate new fid on target according to operation type and
1160                 * parent home mds.
1161                 */
1162                *mds = op_data->op_mds;
1163        }
1164
1165        return 0;
1166}
1167
1168int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1169{
1170        struct lmv_tgt_desc     *tgt;
1171        int                      rc;
1172
1173        tgt = lmv_get_target(lmv, mds, NULL);
1174        if (IS_ERR(tgt))
1175                return PTR_ERR(tgt);
1176
1177        /*
1178         * New seq alloc and FLD setup should be atomic. Otherwise we may find
1179         * on server that seq in new allocated fid is not yet known.
1180         */
1181        mutex_lock(&tgt->ltd_fid_mutex);
1182
1183        if (tgt->ltd_active == 0 || !tgt->ltd_exp) {
1184                rc = -ENODEV;
1185                goto out;
1186        }
1187
1188        /*
1189         * Asking underlaying tgt layer to allocate new fid.
1190         */
1191        rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1192        if (rc > 0) {
1193                LASSERT(fid_is_sane(fid));
1194                rc = 0;
1195        }
1196
1197out:
1198        mutex_unlock(&tgt->ltd_fid_mutex);
1199        return rc;
1200}
1201
1202int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1203                  struct lu_fid *fid, struct md_op_data *op_data)
1204{
1205        struct obd_device     *obd = class_exp2obd(exp);
1206        struct lmv_obd  *lmv = &obd->u.lmv;
1207        u32                    mds = 0;
1208        int                 rc;
1209
1210        LASSERT(op_data);
1211        LASSERT(fid);
1212
1213        rc = lmv_placement_policy(obd, op_data, &mds);
1214        if (rc) {
1215                CERROR("Can't get target for allocating fid, rc %d\n",
1216                       rc);
1217                return rc;
1218        }
1219
1220        rc = __lmv_fid_alloc(lmv, fid, mds);
1221        if (rc) {
1222                CERROR("Can't alloc new fid, rc %d\n", rc);
1223                return rc;
1224        }
1225
1226        return rc;
1227}
1228
1229static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1230{
1231        struct lmv_obd       *lmv = &obd->u.lmv;
1232        struct lprocfs_static_vars  lvars = { NULL };
1233        struct lmv_desc     *desc;
1234        int                      rc;
1235
1236        if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1237                CERROR("LMV setup requires a descriptor\n");
1238                return -EINVAL;
1239        }
1240
1241        desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1242        if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1243                CERROR("Lmv descriptor size wrong: %d > %d\n",
1244                       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1245                return -EINVAL;
1246        }
1247
1248        lmv->tgts_size = 32U;
1249        lmv->tgts = kcalloc(lmv->tgts_size, sizeof(*lmv->tgts), GFP_NOFS);
1250        if (!lmv->tgts)
1251                return -ENOMEM;
1252
1253        obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1254        lmv->desc.ld_tgt_count = 0;
1255        lmv->desc.ld_active_tgt_count = 0;
1256        lmv->max_def_easize = 0;
1257        lmv->max_easize = 0;
1258
1259        spin_lock_init(&lmv->lmv_lock);
1260        mutex_init(&lmv->lmv_init_mutex);
1261
1262        lprocfs_lmv_init_vars(&lvars);
1263
1264        lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
1265        rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
1266                                 0444, &lmv_proc_target_fops, obd);
1267        if (rc)
1268                CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1269                      obd->obd_name, rc);
1270        rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1271                             LUSTRE_CLI_FLD_HASH_DHT);
1272        if (rc) {
1273                CERROR("Can't init FLD, err %d\n", rc);
1274                goto out;
1275        }
1276
1277        return 0;
1278
1279out:
1280        return rc;
1281}
1282
1283static int lmv_cleanup(struct obd_device *obd)
1284{
1285        struct lmv_obd   *lmv = &obd->u.lmv;
1286
1287        fld_client_fini(&lmv->lmv_fld);
1288        if (lmv->tgts) {
1289                int i;
1290
1291                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1292                        if (!lmv->tgts[i])
1293                                continue;
1294                        lmv_del_target(lmv, i);
1295                }
1296                kfree(lmv->tgts);
1297                lmv->tgts_size = 0;
1298        }
1299        return 0;
1300}
1301
1302static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1303{
1304        struct lustre_cfg       *lcfg = buf;
1305        struct obd_uuid         obd_uuid;
1306        int                     gen;
1307        __u32                   index;
1308        int                     rc;
1309
1310        switch (lcfg->lcfg_command) {
1311        case LCFG_ADD_MDC:
1312                /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1313                 * 2:0  3:1  4:lustre-MDT0000-mdc_UUID
1314                 */
1315                if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
1316                        rc = -EINVAL;
1317                        goto out;
1318                }
1319
1320                obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1321
1322                if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1) {
1323                        rc = -EINVAL;
1324                        goto out;
1325                }
1326                if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1) {
1327                        rc = -EINVAL;
1328                        goto out;
1329                }
1330                rc = lmv_add_target(obd, &obd_uuid, index, gen);
1331                goto out;
1332        default:
1333                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1334                rc = -EINVAL;
1335                goto out;
1336        }
1337out:
1338        return rc;
1339}
1340
1341static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1342                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1343{
1344        struct obd_device     *obd = class_exp2obd(exp);
1345        struct lmv_obd  *lmv = &obd->u.lmv;
1346        struct obd_statfs     *temp;
1347        int                 rc = 0;
1348        u32 i;
1349
1350        temp = kzalloc(sizeof(*temp), GFP_NOFS);
1351        if (!temp)
1352                return -ENOMEM;
1353
1354        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1355                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1356                        continue;
1357
1358                rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1359                                max_age, flags);
1360                if (rc) {
1361                        CERROR("can't stat MDS #%d (%s), error %d\n", i,
1362                               lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1363                               rc);
1364                        goto out_free_temp;
1365                }
1366
1367                if (i == 0) {
1368                        *osfs = *temp;
1369                        /* If the statfs is from mount, it will needs
1370                         * retrieve necessary information from MDT0.
1371                         * i.e. mount does not need the merged osfs
1372                         * from all of MDT.
1373                         * And also clients can be mounted as long as
1374                         * MDT0 is in service
1375                         */
1376                        if (flags & OBD_STATFS_FOR_MDT0)
1377                                goto out_free_temp;
1378                } else {
1379                        osfs->os_bavail += temp->os_bavail;
1380                        osfs->os_blocks += temp->os_blocks;
1381                        osfs->os_ffree += temp->os_ffree;
1382                        osfs->os_files += temp->os_files;
1383                }
1384        }
1385
1386out_free_temp:
1387        kfree(temp);
1388        return rc;
1389}
1390
1391static int lmv_getstatus(struct obd_export *exp,
1392                         struct lu_fid *fid)
1393{
1394        struct obd_device    *obd = exp->exp_obd;
1395        struct lmv_obd       *lmv = &obd->u.lmv;
1396
1397        return md_getstatus(lmv->tgts[0]->ltd_exp, fid);
1398}
1399
1400static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1401                        u64 valid, const char *name,
1402                        const char *input, int input_size, int output_size,
1403                        int flags, struct ptlrpc_request **request)
1404{
1405        struct obd_device      *obd = exp->exp_obd;
1406        struct lmv_obd   *lmv = &obd->u.lmv;
1407        struct lmv_tgt_desc    *tgt;
1408
1409        tgt = lmv_find_target(lmv, fid);
1410        if (IS_ERR(tgt))
1411                return PTR_ERR(tgt);
1412
1413        return md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1414                         input_size, output_size, flags, request);
1415}
1416
1417static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1418                        u64 valid, const char *name,
1419                        const char *input, int input_size, int output_size,
1420                        int flags, __u32 suppgid,
1421                        struct ptlrpc_request **request)
1422{
1423        struct obd_device      *obd = exp->exp_obd;
1424        struct lmv_obd   *lmv = &obd->u.lmv;
1425        struct lmv_tgt_desc    *tgt;
1426
1427        tgt = lmv_find_target(lmv, fid);
1428        if (IS_ERR(tgt))
1429                return PTR_ERR(tgt);
1430
1431        return md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1432                         input_size, output_size, flags, suppgid,
1433                         request);
1434}
1435
1436static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1437                       struct ptlrpc_request **request)
1438{
1439        struct obd_device       *obd = exp->exp_obd;
1440        struct lmv_obd    *lmv = &obd->u.lmv;
1441        struct lmv_tgt_desc     *tgt;
1442
1443        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1444        if (IS_ERR(tgt))
1445                return PTR_ERR(tgt);
1446
1447        if (op_data->op_flags & MF_GET_MDT_IDX) {
1448                op_data->op_mds = tgt->ltd_idx;
1449                return 0;
1450        }
1451
1452        return md_getattr(tgt->ltd_exp, op_data, request);
1453}
1454
1455static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1456{
1457        struct obd_device   *obd = exp->exp_obd;
1458        struct lmv_obd      *lmv = &obd->u.lmv;
1459        u32 i;
1460
1461        CDEBUG(D_INODE, "CBDATA for " DFID "\n", PFID(fid));
1462
1463        /*
1464         * With DNE every object can have two locks in different namespaces:
1465         * lookup lock in space of MDT storing direntry and update/open lock in
1466         * space of MDT storing inode.
1467         */
1468        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1469                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1470                        continue;
1471                md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1472        }
1473
1474        return 0;
1475}
1476
1477static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1478                     struct md_open_data *mod, struct ptlrpc_request **request)
1479{
1480        struct obd_device     *obd = exp->exp_obd;
1481        struct lmv_obd  *lmv = &obd->u.lmv;
1482        struct lmv_tgt_desc   *tgt;
1483
1484        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1485        if (IS_ERR(tgt))
1486                return PTR_ERR(tgt);
1487
1488        CDEBUG(D_INODE, "CLOSE " DFID "\n", PFID(&op_data->op_fid1));
1489        return md_close(tgt->ltd_exp, op_data, mod, request);
1490}
1491
1492/**
1493 * Choosing the MDT by name or FID in @op_data.
1494 * For non-striped directory, it will locate MDT by fid.
1495 * For striped-directory, it will locate MDT by name. And also
1496 * it will reset op_fid1 with the FID of the chosen stripe.
1497 **/
1498static struct lmv_tgt_desc *
1499lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1500                           const char *name, int namelen, struct lu_fid *fid,
1501                           u32 *mds)
1502{
1503        const struct lmv_oinfo *oinfo;
1504        struct lmv_tgt_desc *tgt;
1505
1506        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1507                if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1508                        return ERR_PTR(-EBADF);
1509                oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1510        } else {
1511                oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
1512                if (IS_ERR(oinfo))
1513                        return ERR_CAST(oinfo);
1514        }
1515
1516        if (fid)
1517                *fid = oinfo->lmo_fid;
1518        if (mds)
1519                *mds = oinfo->lmo_mds;
1520
1521        tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL);
1522
1523        CDEBUG(D_INFO, "locate on mds %u " DFID "\n", oinfo->lmo_mds,
1524               PFID(&oinfo->lmo_fid));
1525        return tgt;
1526}
1527
1528/**
1529 * Locate mds by fid or name
1530 *
1531 * For striped directory (lsm != NULL), it will locate the stripe
1532 * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
1533 * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
1534 * walk through all of stripes to locate the entry.
1535 *
1536 * For normal direcotry, it will locate MDS by FID directly.
1537 * \param[in] lmv       LMV device
1538 * \param[in] op_data   client MD stack parameters, name, namelen
1539 *                      mds_num etc.
1540 * \param[in] fid       object FID used to locate MDS.
1541 *
1542 * retval               pointer to the lmv_tgt_desc if succeed.
1543 *                      ERR_PTR(errno) if failed.
1544 */
1545struct lmv_tgt_desc*
1546lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1547               struct lu_fid *fid)
1548{
1549        struct lmv_stripe_md *lsm = op_data->op_mea1;
1550        struct lmv_tgt_desc *tgt;
1551
1552        /*
1553         * During creating VOLATILE file, it should honor the mdt
1554         * index if the file under striped dir is being restored, see
1555         * ct_restore().
1556         */
1557        if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1558            (int)op_data->op_mds != -1) {
1559                int i;
1560
1561                tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
1562                if (IS_ERR(tgt))
1563                        return tgt;
1564
1565                if (lsm) {
1566                        /* refill the right parent fid */
1567                        for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1568                                struct lmv_oinfo *oinfo;
1569
1570                                oinfo = &lsm->lsm_md_oinfo[i];
1571                                if (oinfo->lmo_mds == op_data->op_mds) {
1572                                        *fid = oinfo->lmo_fid;
1573                                        break;
1574                                }
1575                        }
1576
1577                        if (i == lsm->lsm_md_stripe_count)
1578                                *fid = lsm->lsm_md_oinfo[0].lmo_fid;
1579                }
1580
1581                return tgt;
1582        }
1583
1584        if (!lsm || !op_data->op_namelen) {
1585                tgt = lmv_find_target(lmv, fid);
1586                if (IS_ERR(tgt))
1587                        return tgt;
1588
1589                op_data->op_mds = tgt->ltd_idx;
1590
1591                return tgt;
1592        }
1593
1594        return lmv_locate_target_for_name(lmv, lsm, op_data->op_name,
1595                                          op_data->op_namelen, fid,
1596                                          &op_data->op_mds);
1597}
1598
1599static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1600                      const void *data, size_t datalen, umode_t mode,
1601                      uid_t uid, gid_t gid, cfs_cap_t cap_effective,
1602                      __u64 rdev, struct ptlrpc_request **request)
1603{
1604        struct obd_device       *obd = exp->exp_obd;
1605        struct lmv_obd    *lmv = &obd->u.lmv;
1606        struct lmv_tgt_desc     *tgt;
1607        int                   rc;
1608
1609        if (!lmv->desc.ld_active_tgt_count)
1610                return -EIO;
1611
1612        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1613        if (IS_ERR(tgt))
1614                return PTR_ERR(tgt);
1615
1616        CDEBUG(D_INODE, "CREATE name '%.*s' on " DFID " -> mds #%x\n",
1617               (int)op_data->op_namelen, op_data->op_name,
1618               PFID(&op_data->op_fid1), op_data->op_mds);
1619
1620        rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1621        if (rc)
1622                return rc;
1623
1624        if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) {
1625                /*
1626                 * Send the create request to the MDT where the object
1627                 * will be located
1628                 */
1629                tgt = lmv_find_target(lmv, &op_data->op_fid2);
1630                if (IS_ERR(tgt))
1631                        return PTR_ERR(tgt);
1632
1633                op_data->op_mds = tgt->ltd_idx;
1634        } else {
1635                CDEBUG(D_CONFIG, "Server doesn't support striped dirs\n");
1636        }
1637
1638        CDEBUG(D_INODE, "CREATE obj " DFID " -> mds #%x\n",
1639               PFID(&op_data->op_fid1), op_data->op_mds);
1640
1641        op_data->op_flags |= MF_MDC_CANCEL_FID1;
1642        rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1643                       cap_effective, rdev, request);
1644
1645        if (rc == 0) {
1646                if (!*request)
1647                        return rc;
1648                CDEBUG(D_INODE, "Created - " DFID "\n", PFID(&op_data->op_fid2));
1649        }
1650        return rc;
1651}
1652
1653static int
1654lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1655            const union ldlm_policy_data *policy,
1656            struct lookup_intent *it, struct md_op_data *op_data,
1657            struct lustre_handle *lockh, __u64 extra_lock_flags)
1658{
1659        struct obd_device       *obd = exp->exp_obd;
1660        struct lmv_obd     *lmv = &obd->u.lmv;
1661        struct lmv_tgt_desc      *tgt;
1662
1663        CDEBUG(D_INODE, "ENQUEUE '%s' on " DFID "\n",
1664               LL_IT2STR(it), PFID(&op_data->op_fid1));
1665
1666        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1667        if (IS_ERR(tgt))
1668                return PTR_ERR(tgt);
1669
1670        CDEBUG(D_INODE, "ENQUEUE '%s' on " DFID " -> mds #%u\n",
1671               LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1672
1673        return md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
1674                        extra_lock_flags);
1675}
1676
1677static int
1678lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1679                 struct ptlrpc_request **preq)
1680{
1681        struct ptlrpc_request   *req = NULL;
1682        struct obd_device       *obd = exp->exp_obd;
1683        struct lmv_obd    *lmv = &obd->u.lmv;
1684        struct lmv_tgt_desc     *tgt;
1685        struct mdt_body  *body;
1686        int                   rc;
1687
1688        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1689        if (IS_ERR(tgt))
1690                return PTR_ERR(tgt);
1691
1692        CDEBUG(D_INODE, "GETATTR_NAME for %*s on " DFID " -> mds #%u\n",
1693               (int)op_data->op_namelen, op_data->op_name,
1694               PFID(&op_data->op_fid1), tgt->ltd_idx);
1695
1696        rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
1697        if (rc != 0)
1698                return rc;
1699
1700        body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
1701        if (body->mbo_valid & OBD_MD_MDS) {
1702                struct lu_fid rid = body->mbo_fid1;
1703
1704                CDEBUG(D_INODE, "Request attrs for " DFID "\n",
1705                       PFID(&rid));
1706
1707                tgt = lmv_find_target(lmv, &rid);
1708                if (IS_ERR(tgt)) {
1709                        ptlrpc_req_finished(*preq);
1710                        *preq = NULL;
1711                        return PTR_ERR(tgt);
1712                }
1713
1714                op_data->op_fid1 = rid;
1715                op_data->op_valid |= OBD_MD_FLCROSSREF;
1716                op_data->op_namelen = 0;
1717                op_data->op_name = NULL;
1718                rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1719                ptlrpc_req_finished(*preq);
1720                *preq = req;
1721        }
1722
1723        return rc;
1724}
1725
1726#define md_op_data_fid(op_data, fl)                  \
1727        (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1728         fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1729         fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1730         fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1731         NULL)
1732
1733static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
1734                            struct md_op_data *op_data, int op_tgt,
1735                            enum ldlm_mode mode, int bits, int flag)
1736{
1737        struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1738        struct obd_device      *obd = exp->exp_obd;
1739        struct lmv_obd   *lmv = &obd->u.lmv;
1740        union ldlm_policy_data policy = { { 0 } };
1741        int                  rc = 0;
1742
1743        if (!fid_is_sane(fid))
1744                return 0;
1745
1746        if (!tgt) {
1747                tgt = lmv_find_target(lmv, fid);
1748                if (IS_ERR(tgt))
1749                        return PTR_ERR(tgt);
1750        }
1751
1752        if (tgt->ltd_idx != op_tgt) {
1753                CDEBUG(D_INODE, "EARLY_CANCEL on " DFID "\n", PFID(fid));
1754                policy.l_inodebits.bits = bits;
1755                rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1756                                      mode, LCF_ASYNC, NULL);
1757        } else {
1758                CDEBUG(D_INODE,
1759                       "EARLY_CANCEL skip operation target %d on " DFID "\n",
1760                       op_tgt, PFID(fid));
1761                op_data->op_flags |= flag;
1762                rc = 0;
1763        }
1764
1765        return rc;
1766}
1767
1768/*
1769 * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1770 * op_data->op_fid2
1771 */
1772static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1773                    struct ptlrpc_request **request)
1774{
1775        struct obd_device       *obd = exp->exp_obd;
1776        struct lmv_obd    *lmv = &obd->u.lmv;
1777        struct lmv_tgt_desc     *tgt;
1778        int                   rc;
1779
1780        LASSERT(op_data->op_namelen != 0);
1781
1782        CDEBUG(D_INODE, "LINK " DFID ":%*s to " DFID "\n",
1783               PFID(&op_data->op_fid2), (int)op_data->op_namelen,
1784               op_data->op_name, PFID(&op_data->op_fid1));
1785
1786        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1787        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1788        op_data->op_cap = cfs_curproc_cap_pack();
1789        if (op_data->op_mea2) {
1790                struct lmv_stripe_md *lsm = op_data->op_mea2;
1791                const struct lmv_oinfo *oinfo;
1792
1793                oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
1794                                                op_data->op_namelen);
1795                if (IS_ERR(oinfo))
1796                        return PTR_ERR(oinfo);
1797
1798                op_data->op_fid2 = oinfo->lmo_fid;
1799        }
1800
1801        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1802        if (IS_ERR(tgt))
1803                return PTR_ERR(tgt);
1804
1805        /*
1806         * Cancel UPDATE lock on child (fid1).
1807         */
1808        op_data->op_flags |= MF_MDC_CANCEL_FID2;
1809        rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
1810                              MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1811        if (rc != 0)
1812                return rc;
1813
1814        return md_link(tgt->ltd_exp, op_data, request);
1815}
1816
1817static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1818                      const char *old, size_t oldlen,
1819                      const char *new, size_t newlen,
1820                      struct ptlrpc_request **request)
1821{
1822        struct obd_device       *obd = exp->exp_obd;
1823        struct lmv_obd    *lmv = &obd->u.lmv;
1824        struct obd_export *target_exp;
1825        struct lmv_tgt_desc     *src_tgt;
1826        struct lmv_tgt_desc *tgt_tgt;
1827        struct mdt_body *body;
1828        int                     rc;
1829
1830        LASSERT(oldlen != 0);
1831
1832        CDEBUG(D_INODE, "RENAME %.*s in " DFID ":%d to %.*s in " DFID ":%d\n",
1833               (int)oldlen, old, PFID(&op_data->op_fid1),
1834               op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
1835               (int)newlen, new, PFID(&op_data->op_fid2),
1836               op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
1837
1838        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1839        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1840        op_data->op_cap = cfs_curproc_cap_pack();
1841
1842        if (op_data->op_cli_flags & CLI_MIGRATE) {
1843                LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID " DFID "\n",
1844                         PFID(&op_data->op_fid3));
1845
1846                if (op_data->op_mea1) {
1847                        struct lmv_stripe_md *lsm = op_data->op_mea1;
1848                        struct lmv_tgt_desc *tmp;
1849
1850                        /* Fix the parent fid for striped dir */
1851                        tmp = lmv_locate_target_for_name(lmv, lsm, old,
1852                                                         oldlen,
1853                                                         &op_data->op_fid1,
1854                                                         NULL);
1855                        if (IS_ERR(tmp))
1856                                return PTR_ERR(tmp);
1857                }
1858
1859                rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1860                if (rc)
1861                        return rc;
1862                src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
1863                if (IS_ERR(src_tgt))
1864                        return PTR_ERR(src_tgt);
1865
1866                target_exp = src_tgt->ltd_exp;
1867        } else {
1868                if (op_data->op_mea1) {
1869                        struct lmv_stripe_md *lsm = op_data->op_mea1;
1870
1871                        src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
1872                                                             oldlen,
1873                                                             &op_data->op_fid1,
1874                                                             &op_data->op_mds);
1875                } else {
1876                        src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
1877                }
1878                if (IS_ERR(src_tgt))
1879                        return PTR_ERR(src_tgt);
1880
1881                if (op_data->op_mea2) {
1882                        struct lmv_stripe_md *lsm = op_data->op_mea2;
1883
1884                        tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
1885                                                             newlen,
1886                                                             &op_data->op_fid2,
1887                                                             &op_data->op_mds);
1888                } else {
1889                        tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
1890                }
1891                if (IS_ERR(tgt_tgt))
1892                        return PTR_ERR(tgt_tgt);
1893
1894                target_exp = tgt_tgt->ltd_exp;
1895        }
1896
1897        /*
1898         * LOOKUP lock on src child (fid3) should also be cancelled for
1899         * src_tgt in mdc_rename.
1900         */
1901        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1902
1903        /*
1904         * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1905         * own target.
1906         */
1907        rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1908                              LCK_EX, MDS_INODELOCK_UPDATE,
1909                              MF_MDC_CANCEL_FID2);
1910        if (rc)
1911                return rc;
1912        /*
1913         * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
1914         */
1915        if (fid_is_sane(&op_data->op_fid3)) {
1916                struct lmv_tgt_desc *tgt;
1917
1918                tgt = lmv_find_target(lmv, &op_data->op_fid1);
1919                if (IS_ERR(tgt))
1920                        return PTR_ERR(tgt);
1921
1922                /* Cancel LOOKUP lock on its parent */
1923                rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
1924                                      LCK_EX, MDS_INODELOCK_LOOKUP,
1925                                      MF_MDC_CANCEL_FID3);
1926                if (rc)
1927                        return rc;
1928
1929                rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1930                                      LCK_EX, MDS_INODELOCK_FULL,
1931                                      MF_MDC_CANCEL_FID3);
1932                if (rc)
1933                        return rc;
1934        }
1935
1936retry_rename:
1937        /*
1938         * Cancel all the locks on tgt child (fid4).
1939         */
1940        if (fid_is_sane(&op_data->op_fid4)) {
1941                struct lmv_tgt_desc *tgt;
1942
1943                rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1944                                      LCK_EX, MDS_INODELOCK_FULL,
1945                                      MF_MDC_CANCEL_FID4);
1946                if (rc)
1947                        return rc;
1948
1949                tgt = lmv_find_target(lmv, &op_data->op_fid4);
1950                if (IS_ERR(tgt))
1951                        return PTR_ERR(tgt);
1952
1953                /*
1954                 * Since the target child might be destroyed, and it might
1955                 * become orphan, and we can only check orphan on the local
1956                 * MDT right now, so we send rename request to the MDT where
1957                 * target child is located. If target child does not exist,
1958                 * then it will send the request to the target parent
1959                 */
1960                target_exp = tgt->ltd_exp;
1961        }
1962
1963        rc = md_rename(target_exp, op_data, old, oldlen, new, newlen, request);
1964        if (rc && rc != -EREMOTE)
1965                return rc;
1966
1967        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
1968        if (!body)
1969                return -EPROTO;
1970
1971        /* Not cross-ref case, just get out of here. */
1972        if (likely(!(body->mbo_valid & OBD_MD_MDS)))
1973                return rc;
1974
1975        CDEBUG(D_INODE, "%s: try rename to another MDT for " DFID "\n",
1976               exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
1977
1978        op_data->op_fid4 = body->mbo_fid1;
1979        ptlrpc_req_finished(*request);
1980        *request = NULL;
1981        goto retry_rename;
1982}
1983
1984static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1985                       void *ea, size_t ealen, struct ptlrpc_request **request)
1986{
1987        struct obd_device       *obd = exp->exp_obd;
1988        struct lmv_obd    *lmv = &obd->u.lmv;
1989        struct lmv_tgt_desc     *tgt;
1990
1991        CDEBUG(D_INODE, "SETATTR for " DFID ", valid 0x%x\n",
1992               PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
1993
1994        op_data->op_flags |= MF_MDC_CANCEL_FID1;
1995        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1996        if (IS_ERR(tgt))
1997                return PTR_ERR(tgt);
1998
1999        return md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
2000}
2001
2002static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2003                    struct ptlrpc_request **request)
2004{
2005        struct obd_device        *obd = exp->exp_obd;
2006        struct lmv_obd      *lmv = &obd->u.lmv;
2007        struct lmv_tgt_desc       *tgt;
2008
2009        tgt = lmv_find_target(lmv, fid);
2010        if (IS_ERR(tgt))
2011                return PTR_ERR(tgt);
2012
2013        return md_sync(tgt->ltd_exp, fid, request);
2014}
2015
2016/**
2017 * Get current minimum entry from striped directory
2018 *
2019 * This function will search the dir entry, whose hash value is the
2020 * closest(>=) to @hash_offset, from all of sub-stripes, and it is
2021 * only being called for striped directory.
2022 *
2023 * \param[in] exp               export of LMV
2024 * \param[in] op_data           parameters transferred beween client MD stack
2025 *                              stripe_information will be included in this
2026 *                              parameter
2027 * \param[in] cb_op             ldlm callback being used in enqueue in
2028 *                              mdc_read_page
2029 * \param[in] hash_offset       the hash value, which is used to locate
2030 *                              minum(closet) dir entry
2031 * \param[in|out] stripe_offset the caller use this to indicate the stripe
2032 *                              index of last entry, so to avoid hash conflict
2033 *                              between stripes. It will also be used to
2034 *                              return the stripe index of current dir entry.
2035 * \param[in|out] entp          the minum entry and it also is being used
2036 *                              to input the last dir entry to resolve the
2037 *                              hash conflict
2038 *
2039 * \param[out] ppage            the page which holds the minum entry
2040 *
2041 * \retval                      = 0 get the entry successfully
2042 *                              negative errno (< 0) does not get the entry
2043 */
2044static int lmv_get_min_striped_entry(struct obd_export *exp,
2045                                     struct md_op_data *op_data,
2046                                     struct md_callback *cb_op,
2047                                     __u64 hash_offset, int *stripe_offset,
2048                                     struct lu_dirent **entp,
2049                                     struct page **ppage)
2050{
2051        struct lmv_stripe_md *lsm = op_data->op_mea1;
2052        struct obd_device *obd = exp->exp_obd;
2053        struct lmv_obd *lmv = &obd->u.lmv;
2054        struct lu_dirent *min_ent = NULL;
2055        struct page *min_page = NULL;
2056        struct lmv_tgt_desc *tgt;
2057        int stripe_count;
2058        int min_idx = 0;
2059        int rc = 0;
2060        int i;
2061
2062        stripe_count = lsm->lsm_md_stripe_count;
2063        for (i = 0; i < stripe_count; i++) {
2064                __u64 stripe_hash = hash_offset;
2065                struct lu_dirent *ent = NULL;
2066                struct page *page = NULL;
2067                struct lu_dirpage *dp;
2068
2069                tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
2070                if (IS_ERR(tgt)) {
2071                        rc = PTR_ERR(tgt);
2072                        goto out;
2073                }
2074
2075                /*
2076                 * op_data will be shared by each stripe, so we need
2077                 * reset these value for each stripe
2078                 */
2079                op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2080                op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2081                op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
2082next:
2083                rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
2084                                  &page);
2085                if (rc)
2086                        goto out;
2087
2088                dp = page_address(page);
2089                for (ent = lu_dirent_start(dp); ent;
2090                     ent = lu_dirent_next(ent)) {
2091                        /* Skip dummy entry */
2092                        if (!le16_to_cpu(ent->lde_namelen))
2093                                continue;
2094
2095                        if (le64_to_cpu(ent->lde_hash) < hash_offset)
2096                                continue;
2097
2098                        if (le64_to_cpu(ent->lde_hash) == hash_offset &&
2099                            (*entp == ent || i < *stripe_offset))
2100                                continue;
2101
2102                        /* skip . and .. for other stripes */
2103                        if (i && (!strncmp(ent->lde_name, ".",
2104                                           le16_to_cpu(ent->lde_namelen)) ||
2105                                  !strncmp(ent->lde_name, "..",
2106                                           le16_to_cpu(ent->lde_namelen))))
2107                                continue;
2108                        break;
2109                }
2110
2111                if (!ent) {
2112                        stripe_hash = le64_to_cpu(dp->ldp_hash_end);
2113
2114                        kunmap(page);
2115                        put_page(page);
2116                        page = NULL;
2117
2118                        /*
2119                         * reach the end of current stripe, go to next stripe
2120                         */
2121                        if (stripe_hash == MDS_DIR_END_OFF)
2122                                continue;
2123                        else
2124                                goto next;
2125                }
2126
2127                if (min_ent) {
2128                        if (le64_to_cpu(min_ent->lde_hash) >
2129                            le64_to_cpu(ent->lde_hash)) {
2130                                min_ent = ent;
2131                                kunmap(min_page);
2132                                put_page(min_page);
2133                                min_idx = i;
2134                                min_page = page;
2135                        } else {
2136                                kunmap(page);
2137                                put_page(page);
2138                                page = NULL;
2139                        }
2140                } else {
2141                        min_ent = ent;
2142                        min_page = page;
2143                        min_idx = i;
2144                }
2145        }
2146
2147out:
2148        if (*ppage) {
2149                kunmap(*ppage);
2150                put_page(*ppage);
2151        }
2152        *stripe_offset = min_idx;
2153        *entp = min_ent;
2154        *ppage = min_page;
2155        return rc;
2156}
2157
2158/**
2159 * Build dir entry page from a striped directory
2160 *
2161 * This function gets one entry by @offset from a striped directory. It will
2162 * read entries from all of stripes, and choose one closest to the required
2163 * offset(&offset). A few notes
2164 * 1. skip . and .. for non-zero stripes, because there can only have one .
2165 * and .. in a directory.
2166 * 2. op_data will be shared by all of stripes, instead of allocating new
2167 * one, so need to restore before reusing.
2168 * 3. release the entry page if that is not being chosen.
2169 *
2170 * \param[in] exp       obd export refer to LMV
2171 * \param[in] op_data   hold those MD parameters of read_entry
2172 * \param[in] cb_op     ldlm callback being used in enqueue in mdc_read_entry
2173 * \param[out] ldp      the entry being read
2174 * \param[out] ppage    the page holding the entry. Note: because the entry
2175 *                      will be accessed in upper layer, so we need hold the
2176 *                      page until the usages of entry is finished, see
2177 *                      ll_dir_entry_next.
2178 *
2179 * retval               =0 if get entry successfully
2180 *                      <0 cannot get entry
2181 */
2182static int lmv_read_striped_page(struct obd_export *exp,
2183                                 struct md_op_data *op_data,
2184                                 struct md_callback *cb_op,
2185                                 __u64 offset, struct page **ppage)
2186{
2187        struct inode *master_inode = op_data->op_data;
2188        struct lu_fid master_fid = op_data->op_fid1;
2189        __u64 hash_offset = offset;
2190        __u32 ldp_flags;
2191        struct page *min_ent_page = NULL;
2192        struct page *ent_page = NULL;
2193        struct lu_dirent *min_ent = NULL;
2194        struct lu_dirent *last_ent;
2195        struct lu_dirent *ent;
2196        struct lu_dirpage *dp;
2197        size_t left_bytes;
2198        int ent_idx = 0;
2199        void *area;
2200        int rc;
2201
2202        /*
2203         * Allocate a page and read entries from all of stripes and fill
2204         * the page by hash order
2205         */
2206        ent_page = alloc_page(GFP_KERNEL);
2207        if (!ent_page)
2208                return -ENOMEM;
2209
2210        /* Initialize the entry page */
2211        dp = kmap(ent_page);
2212        memset(dp, 0, sizeof(*dp));
2213        dp->ldp_hash_start = cpu_to_le64(offset);
2214        ldp_flags = LDF_COLLIDE;
2215
2216        area = dp + 1;
2217        left_bytes = PAGE_SIZE - sizeof(*dp);
2218        ent = area;
2219        last_ent = ent;
2220        do {
2221                __u16 ent_size;
2222
2223                /* Find the minum entry from all sub-stripes */
2224                rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
2225                                               &ent_idx, &min_ent,
2226                                               &min_ent_page);
2227                if (rc)
2228                        goto out;
2229
2230                /*
2231                 * If it can not get minum entry, it means it already reaches
2232                 * the end of this directory
2233                 */
2234                if (!min_ent) {
2235                        last_ent->lde_reclen = 0;
2236                        hash_offset = MDS_DIR_END_OFF;
2237                        goto out;
2238                }
2239
2240                ent_size = le16_to_cpu(min_ent->lde_reclen);
2241
2242                /*
2243                 * the last entry lde_reclen is 0, but it might not
2244                 * the end of this entry of this temporay entry
2245                 */
2246                if (!ent_size)
2247                        ent_size = lu_dirent_calc_size(
2248                                        le16_to_cpu(min_ent->lde_namelen),
2249                                        le32_to_cpu(min_ent->lde_attrs));
2250                if (ent_size > left_bytes) {
2251                        last_ent->lde_reclen = cpu_to_le16(0);
2252                        hash_offset = le64_to_cpu(min_ent->lde_hash);
2253                        goto out;
2254                }
2255
2256                memcpy(ent, min_ent, ent_size);
2257
2258                /*
2259                 * Replace . with master FID and Replace .. with the parent FID
2260                 * of master object
2261                 */
2262                if (!strncmp(ent->lde_name, ".",
2263                             le16_to_cpu(ent->lde_namelen)) &&
2264                    le16_to_cpu(ent->lde_namelen) == 1)
2265                        fid_cpu_to_le(&ent->lde_fid, &master_fid);
2266                else if (!strncmp(ent->lde_name, "..",
2267                                  le16_to_cpu(ent->lde_namelen)) &&
2268                         le16_to_cpu(ent->lde_namelen) == 2)
2269                        fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2270
2271                left_bytes -= ent_size;
2272                ent->lde_reclen = cpu_to_le16(ent_size);
2273                last_ent = ent;
2274                ent = (void *)ent + ent_size;
2275                hash_offset = le64_to_cpu(min_ent->lde_hash);
2276                if (hash_offset == MDS_DIR_END_OFF) {
2277                        last_ent->lde_reclen = 0;
2278                        break;
2279                }
2280        } while (1);
2281out:
2282        if (min_ent_page) {
2283                kunmap(min_ent_page);
2284                put_page(min_ent_page);
2285        }
2286
2287        if (unlikely(rc)) {
2288                __free_page(ent_page);
2289                ent_page = NULL;
2290        } else {
2291                if (ent == area)
2292                        ldp_flags |= LDF_EMPTY;
2293                dp->ldp_flags |= cpu_to_le32(ldp_flags);
2294                dp->ldp_hash_end = cpu_to_le64(hash_offset);
2295        }
2296
2297        /*
2298         * We do not want to allocate md_op_data during each
2299         * dir entry reading, so op_data will be shared by every stripe,
2300         * then we need to restore it back to original value before
2301         * return to the upper layer
2302         */
2303        op_data->op_fid1 = master_fid;
2304        op_data->op_fid2 = master_fid;
2305        op_data->op_data = master_inode;
2306
2307        *ppage = ent_page;
2308
2309        return rc;
2310}
2311
2312static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
2313                         struct md_callback *cb_op, __u64 offset,
2314                         struct page **ppage)
2315{
2316        struct lmv_stripe_md *lsm = op_data->op_mea1;
2317        struct obd_device *obd = exp->exp_obd;
2318        struct lmv_obd *lmv = &obd->u.lmv;
2319        struct lmv_tgt_desc *tgt;
2320
2321        if (unlikely(lsm))
2322                return lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
2323
2324        tgt = lmv_find_target(lmv, &op_data->op_fid1);
2325        if (IS_ERR(tgt))
2326                return PTR_ERR(tgt);
2327
2328        return md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
2329}
2330
2331/**
2332 * Unlink a file/directory
2333 *
2334 * Unlink a file or directory under the parent dir. The unlink request
2335 * usually will be sent to the MDT where the child is located, but if
2336 * the client does not have the child FID then request will be sent to the
2337 * MDT where the parent is located.
2338 *
2339 * If the parent is a striped directory then it also needs to locate which
2340 * stripe the name of the child is located, and replace the parent FID
2341 * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
2342 * it will walk through all of sub-stripes until the child is being
2343 * unlinked finally.
2344 *
2345 * \param[in] exp       export refer to LMV
2346 * \param[in] op_data   different parameters transferred beween client
2347 *                      MD stacks, name, namelen, FIDs etc.
2348 *                      op_fid1 is the parent FID, op_fid2 is the child
2349 *                      FID.
2350 * \param[out] request point to the request of unlink.
2351 *
2352 * retval               0 if succeed
2353 *                      negative errno if failed.
2354 */
2355static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2356                      struct ptlrpc_request **request)
2357{
2358        struct lmv_stripe_md *lsm = op_data->op_mea1;
2359        struct obd_device    *obd = exp->exp_obd;
2360        struct lmv_obd    *lmv = &obd->u.lmv;
2361        struct lmv_tgt_desc *parent_tgt = NULL;
2362        struct lmv_tgt_desc     *tgt = NULL;
2363        struct mdt_body         *body;
2364        int stripe_index = 0;
2365        int                  rc;
2366
2367retry_unlink:
2368        /* For striped dir, we need to locate the parent as well */
2369        if (lsm) {
2370                struct lmv_tgt_desc *tmp;
2371
2372                LASSERT(op_data->op_name && op_data->op_namelen);
2373
2374                tmp = lmv_locate_target_for_name(lmv, lsm,
2375                                                 op_data->op_name,
2376                                                 op_data->op_namelen,
2377                                                 &op_data->op_fid1,
2378                                                 &op_data->op_mds);
2379
2380                /*
2381                 * return -EBADFD means unknown hash type, might
2382                 * need try all sub-stripe here
2383                 */
2384                if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
2385                        return PTR_ERR(tmp);
2386
2387                /*
2388                 * Note: both migrating dir and unknown hash dir need to
2389                 * try all of sub-stripes, so we need start search the
2390                 * name from stripe 0, but migrating dir is already handled
2391                 * inside lmv_locate_target_for_name(), so we only check
2392                 * unknown hash type directory here
2393                 */
2394                if (!lmv_is_known_hash_type(lsm->lsm_md_hash_type)) {
2395                        struct lmv_oinfo *oinfo;
2396
2397                        oinfo = &lsm->lsm_md_oinfo[stripe_index];
2398
2399                        op_data->op_fid1 = oinfo->lmo_fid;
2400                        op_data->op_mds = oinfo->lmo_mds;
2401                }
2402        }
2403
2404try_next_stripe:
2405        /* Send unlink requests to the MDT where the child is located */
2406        if (likely(!fid_is_zero(&op_data->op_fid2)))
2407                tgt = lmv_find_target(lmv, &op_data->op_fid2);
2408        else if (lsm)
2409                tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
2410        else
2411                tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2412
2413        if (IS_ERR(tgt))
2414                return PTR_ERR(tgt);
2415
2416        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2417        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2418        op_data->op_cap = cfs_curproc_cap_pack();
2419
2420        /*
2421         * If child's fid is given, cancel unused locks for it if it is from
2422         * another export than parent.
2423         *
2424         * LOOKUP lock for child (fid3) should also be cancelled on parent
2425         * tgt_tgt in mdc_unlink().
2426         */
2427        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2428
2429        /*
2430         * Cancel FULL locks on child (fid3).
2431         */
2432        parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2433        if (IS_ERR(parent_tgt))
2434                return PTR_ERR(parent_tgt);
2435
2436        if (parent_tgt != tgt) {
2437                rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
2438                                      LCK_EX, MDS_INODELOCK_LOOKUP,
2439                                      MF_MDC_CANCEL_FID3);
2440        }
2441
2442        rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2443                              MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2444        if (rc != 0)
2445                return rc;
2446
2447        CDEBUG(D_INODE, "unlink with fid=" DFID "/" DFID " -> mds #%u\n",
2448               PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2449
2450        rc = md_unlink(tgt->ltd_exp, op_data, request);
2451        if (rc != 0 && rc != -EREMOTE  && rc != -ENOENT)
2452                return rc;
2453
2454        /* Try next stripe if it is needed. */
2455        if (rc == -ENOENT && lsm && lmv_need_try_all_stripes(lsm)) {
2456                struct lmv_oinfo *oinfo;
2457
2458                stripe_index++;
2459                if (stripe_index >= lsm->lsm_md_stripe_count)
2460                        return rc;
2461
2462                oinfo = &lsm->lsm_md_oinfo[stripe_index];
2463
2464                op_data->op_fid1 = oinfo->lmo_fid;
2465                op_data->op_mds = oinfo->lmo_mds;
2466
2467                ptlrpc_req_finished(*request);
2468                *request = NULL;
2469
2470                goto try_next_stripe;
2471        }
2472
2473        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2474        if (!body)
2475                return -EPROTO;
2476
2477        /* Not cross-ref case, just get out of here. */
2478        if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2479                return rc;
2480
2481        CDEBUG(D_INODE, "%s: try unlink to another MDT for " DFID "\n",
2482               exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2483
2484        /* This is a remote object, try remote MDT, Note: it may
2485         * try more than 1 time here, Considering following case
2486         * /mnt/lustre is root on MDT0, remote1 is on MDT1
2487         * 1. Initially A does not know where remote1 is, it send
2488         *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2489         *    resend unlink RPC to MDT1 (retry 1st time).
2490         *
2491         * 2. During the unlink RPC in flight,
2492         *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2493         *    and create new remote1, but on MDT0
2494         *
2495         * 3. MDT1 get unlink RPC(from A), then do remote lock on
2496         *    /mnt/lustre, then lookup get fid of remote1, and find
2497         *    it is remote dir again, and replay -EREMOTE again.
2498         *
2499         * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2500         *
2501         * In theory, it might try unlimited time here, but it should
2502         * be very rare case.
2503         */
2504        op_data->op_fid2 = body->mbo_fid1;
2505        ptlrpc_req_finished(*request);
2506        *request = NULL;
2507
2508        goto retry_unlink;
2509}
2510
2511static int lmv_precleanup(struct obd_device *obd)
2512{
2513        fld_client_debugfs_fini(&obd->u.lmv.lmv_fld);
2514        lprocfs_obd_cleanup(obd);
2515        return 0;
2516}
2517
2518/**
2519 * Get by key a value associated with a LMV device.
2520 *
2521 * Dispatch request to lower-layer devices as needed.
2522 *
2523 * \param[in]  env      execution environment for this thread
2524 * \param[in]  exp      export for the LMV device
2525 * \param[in]  keylen   length of key identifier
2526 * \param[in]  key      identifier of key to get value for
2527 * \param[in]  vallen   size of \a val
2528 * \param[out] val      pointer to storage location for value
2529 *
2530 * \retval 0            on success
2531 * \retval negative     negated errno on failure
2532 */
2533static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2534                        __u32 keylen, void *key, __u32 *vallen, void *val)
2535{
2536        struct obd_device       *obd;
2537        struct lmv_obd    *lmv;
2538        int                   rc = 0;
2539
2540        obd = class_exp2obd(exp);
2541        if (!obd) {
2542                CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2543                       exp->exp_handle.h_cookie);
2544                return -EINVAL;
2545        }
2546
2547        lmv = &obd->u.lmv;
2548        if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2549                int i;
2550
2551                LASSERT(*vallen == sizeof(__u32));
2552                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2553                        struct lmv_tgt_desc *tgt = lmv->tgts[i];
2554
2555                        /*
2556                         * All tgts should be connected when this gets called.
2557                         */
2558                        if (!tgt || !tgt->ltd_exp)
2559                                continue;
2560
2561                        if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2562                                          vallen, val))
2563                                return 0;
2564                }
2565                return -EINVAL;
2566        } else if (KEY_IS(KEY_MAX_EASIZE) ||
2567                   KEY_IS(KEY_DEFAULT_EASIZE) ||
2568                   KEY_IS(KEY_CONN_DATA)) {
2569                /*
2570                 * Forwarding this request to first MDS, it should know LOV
2571                 * desc.
2572                 */
2573                rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2574                                  vallen, val);
2575                if (!rc && KEY_IS(KEY_CONN_DATA))
2576                        exp->exp_connect_data = *(struct obd_connect_data *)val;
2577                return rc;
2578        } else if (KEY_IS(KEY_TGT_COUNT)) {
2579                *((int *)val) = lmv->desc.ld_tgt_count;
2580                return 0;
2581        }
2582
2583        CDEBUG(D_IOCTL, "Invalid key\n");
2584        return -EINVAL;
2585}
2586
2587/**
2588 * Asynchronously set by key a value associated with a LMV device.
2589 *
2590 * Dispatch request to lower-layer devices as needed.
2591 *
2592 * \param[in] env       execution environment for this thread
2593 * \param[in] exp       export for the LMV device
2594 * \param[in] keylen    length of key identifier
2595 * \param[in] key       identifier of key to store value for
2596 * \param[in] vallen    size of value to store
2597 * \param[in] val       pointer to data to be stored
2598 * \param[in] set       optional list of related ptlrpc requests
2599 *
2600 * \retval 0            on success
2601 * \retval negative     negated errno on failure
2602 */
2603static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2604                              u32 keylen, void *key, u32 vallen,
2605                              void *val, struct ptlrpc_request_set *set)
2606{
2607        struct lmv_tgt_desc    *tgt;
2608        struct obd_device      *obd;
2609        struct lmv_obd   *lmv;
2610        int rc = 0;
2611
2612        obd = class_exp2obd(exp);
2613        if (!obd) {
2614                CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2615                       exp->exp_handle.h_cookie);
2616                return -EINVAL;
2617        }
2618        lmv = &obd->u.lmv;
2619
2620        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
2621            KEY_IS(KEY_DEFAULT_EASIZE)) {
2622                int i, err = 0;
2623
2624                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2625                        tgt = lmv->tgts[i];
2626
2627                        if (!tgt || !tgt->ltd_exp)
2628                                continue;
2629
2630                        err = obd_set_info_async(env, tgt->ltd_exp,
2631                                                 keylen, key, vallen, val, set);
2632                        if (err && rc == 0)
2633                                rc = err;
2634                }
2635
2636                return rc;
2637        }
2638
2639        return -EINVAL;
2640}
2641
2642static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
2643                            const struct lmv_mds_md_v1 *lmm1)
2644{
2645        struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2646        int stripe_count;
2647        int rc = 0;
2648        int cplen;
2649        int i;
2650
2651        lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
2652        lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2653        lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
2654        if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
2655                lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
2656        else
2657                lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
2658        lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
2659        cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
2660                        sizeof(lsm->lsm_md_pool_name));
2661
2662        if (cplen >= sizeof(lsm->lsm_md_pool_name))
2663                return -E2BIG;
2664
2665        CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d layout_version %d\n",
2666               lsm->lsm_md_stripe_count, lsm->lsm_md_master_mdt_index,
2667               lsm->lsm_md_hash_type, lsm->lsm_md_layout_version);
2668
2669        stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2670        for (i = 0; i < stripe_count; i++) {
2671                fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
2672                              &lmm1->lmv_stripe_fids[i]);
2673                rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
2674                                    &lsm->lsm_md_oinfo[i].lmo_mds);
2675                if (rc)
2676                        return rc;
2677                CDEBUG(D_INFO, "unpack fid #%d " DFID "\n", i,
2678                       PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
2679        }
2680
2681        return rc;
2682}
2683
2684static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
2685                        const union lmv_mds_md *lmm, size_t lmm_size)
2686{
2687        struct lmv_stripe_md *lsm;
2688        bool allocated = false;
2689        int lsm_size, rc;
2690
2691        LASSERT(lsmp);
2692
2693        lsm = *lsmp;
2694        /* Free memmd */
2695        if (lsm && !lmm) {
2696                int i;
2697
2698                for (i = 1; i < lsm->lsm_md_stripe_count; i++) {
2699                        /*
2700                         * For migrating inode, the master stripe and master
2701                         * object will be the same, so do not need iput, see
2702                         * ll_update_lsm_md
2703                         */
2704                        if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
2705                              !i) && lsm->lsm_md_oinfo[i].lmo_root)
2706                                iput(lsm->lsm_md_oinfo[i].lmo_root);
2707                }
2708
2709                kvfree(lsm);
2710                *lsmp = NULL;
2711                return 0;
2712        }
2713
2714        if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
2715                return -EPERM;
2716
2717        /* Unpack memmd */
2718        if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
2719            le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
2720                CERROR("%s: invalid lmv magic %x: rc = %d\n",
2721                       exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
2722                       -EIO);
2723                return -EIO;
2724        }
2725
2726        if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
2727                lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2728        else
2729                /**
2730                 * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
2731                 * stripecount should be 0 then.
2732                 */
2733                lsm_size = lmv_stripe_md_size(0);
2734
2735        if (!lsm) {
2736                lsm = libcfs_kvzalloc(lsm_size, GFP_NOFS);
2737                if (!lsm)
2738                        return -ENOMEM;
2739                allocated = true;
2740                *lsmp = lsm;
2741        }
2742
2743        switch (le32_to_cpu(lmm->lmv_magic)) {
2744        case LMV_MAGIC_V1:
2745                rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
2746                break;
2747        default:
2748                CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
2749                       le32_to_cpu(lmm->lmv_magic));
2750                rc = -EINVAL;
2751                break;
2752        }
2753
2754        if (rc && allocated) {
2755                kvfree(lsm);
2756                *lsmp = NULL;
2757                lsm_size = rc;
2758        }
2759        return lsm_size;
2760}
2761
2762void lmv_free_memmd(struct lmv_stripe_md *lsm)
2763{
2764        lmv_unpackmd(NULL, &lsm, NULL, 0);
2765}
2766EXPORT_SYMBOL(lmv_free_memmd);
2767
2768static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2769                             union ldlm_policy_data *policy,
2770                             enum ldlm_mode mode, enum ldlm_cancel_flags flags,
2771                             void *opaque)
2772{
2773        struct obd_device       *obd = exp->exp_obd;
2774        struct lmv_obd    *lmv = &obd->u.lmv;
2775        int                   rc = 0;
2776        int                   err;
2777        u32 i;
2778
2779        LASSERT(fid);
2780
2781        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2782                struct lmv_tgt_desc *tgt = lmv->tgts[i];
2783
2784                if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
2785                        continue;
2786
2787                err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
2788                                       opaque);
2789                if (!rc)
2790                        rc = err;
2791        }
2792        return rc;
2793}
2794
2795static int lmv_set_lock_data(struct obd_export *exp,
2796                             const struct lustre_handle *lockh,
2797                             void *data, __u64 *bits)
2798{
2799        struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2800        struct lmv_tgt_desc *tgt = lmv->tgts[0];
2801
2802        if (!tgt || !tgt->ltd_exp)
2803                return -EINVAL;
2804
2805        return md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
2806}
2807
2808static enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags,
2809                                     const struct lu_fid *fid,
2810                                     enum ldlm_type type,
2811                                     union ldlm_policy_data *policy,
2812                                     enum ldlm_mode mode,
2813                                     struct lustre_handle *lockh)
2814{
2815        struct obd_device       *obd = exp->exp_obd;
2816        struct lmv_obd    *lmv = &obd->u.lmv;
2817        enum ldlm_mode        rc;
2818        int tgt;
2819        u32 i;
2820
2821        CDEBUG(D_INODE, "Lock match for " DFID "\n", PFID(fid));
2822
2823        /*
2824         * With DNE every object can have two locks in different namespaces:
2825         * lookup lock in space of MDT storing direntry and update/open lock in
2826         * space of MDT storing inode.  Try the MDT that the FID maps to first,
2827         * since this can be easily found, and only try others if that fails.
2828         */
2829        for (i = 0, tgt = lmv_find_target_index(lmv, fid);
2830             i < lmv->desc.ld_tgt_count;
2831             i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
2832                if (tgt < 0) {
2833                        CDEBUG(D_HA, "%s: " DFID " is inaccessible: rc = %d\n",
2834                               obd->obd_name, PFID(fid), tgt);
2835                        tgt = 0;
2836                }
2837
2838                if (!lmv->tgts[tgt] || !lmv->tgts[tgt]->ltd_exp ||
2839                    !lmv->tgts[tgt]->ltd_active)
2840                        continue;
2841
2842                rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
2843                                   type, policy, mode, lockh);
2844                if (rc)
2845                        return rc;
2846        }
2847
2848        return 0;
2849}
2850
2851static int lmv_get_lustre_md(struct obd_export *exp,
2852                             struct ptlrpc_request *req,
2853                             struct obd_export *dt_exp,
2854                             struct obd_export *md_exp,
2855                             struct lustre_md *md)
2856{
2857        struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2858        struct lmv_tgt_desc *tgt = lmv->tgts[0];
2859
2860        if (!tgt || !tgt->ltd_exp)
2861                return -EINVAL;
2862        return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md);
2863}
2864
2865static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2866{
2867        struct obd_device       *obd = exp->exp_obd;
2868        struct lmv_obd    *lmv = &obd->u.lmv;
2869        struct lmv_tgt_desc *tgt = lmv->tgts[0];
2870
2871        if (md->lmv) {
2872                lmv_free_memmd(md->lmv);
2873                md->lmv = NULL;
2874        }
2875        if (!tgt || !tgt->ltd_exp)
2876                return -EINVAL;
2877        return md_free_lustre_md(tgt->ltd_exp, md);
2878}
2879
2880static int lmv_set_open_replay_data(struct obd_export *exp,
2881                                    struct obd_client_handle *och,
2882                                    struct lookup_intent *it)
2883{
2884        struct obd_device       *obd = exp->exp_obd;
2885        struct lmv_obd    *lmv = &obd->u.lmv;
2886        struct lmv_tgt_desc     *tgt;
2887
2888        tgt = lmv_find_target(lmv, &och->och_fid);
2889        if (IS_ERR(tgt))
2890                return PTR_ERR(tgt);
2891
2892        return md_set_open_replay_data(tgt->ltd_exp, och, it);
2893}
2894
2895static int lmv_clear_open_replay_data(struct obd_export *exp,
2896                                      struct obd_client_handle *och)
2897{
2898        struct obd_device       *obd = exp->exp_obd;
2899        struct lmv_obd    *lmv = &obd->u.lmv;
2900        struct lmv_tgt_desc     *tgt;
2901
2902        tgt = lmv_find_target(lmv, &och->och_fid);
2903        if (IS_ERR(tgt))
2904                return PTR_ERR(tgt);
2905
2906        return md_clear_open_replay_data(tgt->ltd_exp, och);
2907}
2908
2909static int lmv_intent_getattr_async(struct obd_export *exp,
2910                                    struct md_enqueue_info *minfo)
2911{
2912        struct md_op_data       *op_data = &minfo->mi_data;
2913        struct obd_device       *obd = exp->exp_obd;
2914        struct lmv_obd    *lmv = &obd->u.lmv;
2915        struct lmv_tgt_desc *ptgt = NULL;
2916        struct lmv_tgt_desc *ctgt = NULL;
2917
2918        if (!fid_is_sane(&op_data->op_fid2))
2919                return -EINVAL;
2920
2921        ptgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2922        if (IS_ERR(ptgt))
2923                return PTR_ERR(ptgt);
2924
2925        ctgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2926        if (IS_ERR(ctgt))
2927                return PTR_ERR(ctgt);
2928
2929        /*
2930         * if child is on remote MDT, we need 2 async RPCs to fetch both LOOKUP
2931         * lock on parent, and UPDATE lock on child MDT, which makes all
2932         * complicated. Considering remote dir is rare case, and not supporting
2933         * it in statahead won't cause any issue, drop its support for now.
2934         */
2935        if (ptgt != ctgt)
2936                return -ENOTSUPP;
2937
2938        return md_intent_getattr_async(ptgt->ltd_exp, minfo);
2939}
2940
2941static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2942                               struct lu_fid *fid, __u64 *bits)
2943{
2944        struct obd_device       *obd = exp->exp_obd;
2945        struct lmv_obd    *lmv = &obd->u.lmv;
2946        struct lmv_tgt_desc     *tgt;
2947
2948        tgt = lmv_find_target(lmv, fid);
2949        if (IS_ERR(tgt))
2950                return PTR_ERR(tgt);
2951
2952        return md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2953}
2954
2955static int
2956lmv_get_fid_from_lsm(struct obd_export *exp,
2957                     const struct lmv_stripe_md *lsm,
2958                     const char *name, int namelen, struct lu_fid *fid)
2959{
2960        const struct lmv_oinfo *oinfo;
2961
2962        LASSERT(lsm);
2963        oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
2964        if (IS_ERR(oinfo))
2965                return PTR_ERR(oinfo);
2966
2967        *fid = oinfo->lmo_fid;
2968
2969        return 0;
2970}
2971
2972/**
2973 * For lmv, only need to send request to master MDT, and the master MDT will
2974 * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2975 * we directly fetch data from the slave MDTs.
2976 */
2977static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2978                        struct obd_quotactl *oqctl)
2979{
2980        struct obd_device   *obd = class_exp2obd(exp);
2981        struct lmv_obd      *lmv = &obd->u.lmv;
2982        struct lmv_tgt_desc *tgt = lmv->tgts[0];
2983        int rc = 0;
2984        __u64 curspace = 0, curinodes = 0;
2985        u32 i;
2986
2987        if (!tgt || !tgt->ltd_exp || !tgt->ltd_active ||
2988            !lmv->desc.ld_tgt_count) {
2989                CERROR("master lmv inactive\n");
2990                return -EIO;
2991        }
2992
2993        if (oqctl->qc_cmd != Q_GETOQUOTA)
2994                return obd_quotactl(tgt->ltd_exp, oqctl);
2995
2996        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2997                int err;
2998
2999                tgt = lmv->tgts[i];
3000
3001                if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
3002                        continue;
3003
3004                err = obd_quotactl(tgt->ltd_exp, oqctl);
3005                if (err) {
3006                        CERROR("getquota on mdt %d failed. %d\n", i, err);
3007                        if (!rc)
3008                                rc = err;
3009                } else {
3010                        curspace += oqctl->qc_dqblk.dqb_curspace;
3011                        curinodes += oqctl->qc_dqblk.dqb_curinodes;
3012                }
3013        }
3014        oqctl->qc_dqblk.dqb_curspace = curspace;
3015        oqctl->qc_dqblk.dqb_curinodes = curinodes;
3016
3017        return rc;
3018}
3019
3020static int lmv_merge_attr(struct obd_export *exp,
3021                          const struct lmv_stripe_md *lsm,
3022                          struct cl_attr *attr,
3023                          ldlm_blocking_callback cb_blocking)
3024{
3025        int rc, i;
3026
3027        rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3028        if (rc < 0)
3029                return rc;
3030
3031        for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3032                struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3033
3034                CDEBUG(D_INFO, "" DFID " size %llu, blocks %llu nlink %u, atime %lu ctime %lu, mtime %lu.\n",
3035                       PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3036                       i_size_read(inode), (unsigned long long)inode->i_blocks,
3037                       inode->i_nlink, LTIME_S(inode->i_atime),
3038                       LTIME_S(inode->i_ctime), LTIME_S(inode->i_mtime));
3039
3040                /* for slave stripe, it needs to subtract nlink for . and .. */
3041                if (i)
3042                        attr->cat_nlink += inode->i_nlink - 2;
3043                else
3044                        attr->cat_nlink = inode->i_nlink;
3045
3046                attr->cat_size += i_size_read(inode);
3047                attr->cat_blocks += inode->i_blocks;
3048
3049                if (attr->cat_atime < LTIME_S(inode->i_atime))
3050                        attr->cat_atime = LTIME_S(inode->i_atime);
3051
3052                if (attr->cat_ctime < LTIME_S(inode->i_ctime))
3053                        attr->cat_ctime = LTIME_S(inode->i_ctime);
3054
3055                if (attr->cat_mtime < LTIME_S(inode->i_mtime))
3056                        attr->cat_mtime = LTIME_S(inode->i_mtime);
3057        }
3058        return 0;
3059}
3060
3061static struct obd_ops lmv_obd_ops = {
3062        .owner          = THIS_MODULE,
3063        .setup          = lmv_setup,
3064        .cleanup        = lmv_cleanup,
3065        .precleanup     = lmv_precleanup,
3066        .process_config = lmv_process_config,
3067        .connect        = lmv_connect,
3068        .disconnect     = lmv_disconnect,
3069        .statfs         = lmv_statfs,
3070        .get_info       = lmv_get_info,
3071        .set_info_async = lmv_set_info_async,
3072        .notify         = lmv_notify,
3073        .get_uuid       = lmv_get_uuid,
3074        .iocontrol      = lmv_iocontrol,
3075        .quotactl       = lmv_quotactl
3076};
3077
3078static struct md_ops lmv_md_ops = {
3079        .getstatus              = lmv_getstatus,
3080        .null_inode             = lmv_null_inode,
3081        .close                  = lmv_close,
3082        .create                 = lmv_create,
3083        .enqueue                = lmv_enqueue,
3084        .getattr                = lmv_getattr,
3085        .getxattr               = lmv_getxattr,
3086        .getattr_name           = lmv_getattr_name,
3087        .intent_lock            = lmv_intent_lock,
3088        .link                   = lmv_link,
3089        .rename                 = lmv_rename,
3090        .setattr                = lmv_setattr,
3091        .setxattr               = lmv_setxattr,
3092        .sync                   = lmv_sync,
3093        .read_page              = lmv_read_page,
3094        .unlink                 = lmv_unlink,
3095        .init_ea_size           = lmv_init_ea_size,
3096        .cancel_unused          = lmv_cancel_unused,
3097        .set_lock_data          = lmv_set_lock_data,
3098        .lock_match             = lmv_lock_match,
3099        .get_lustre_md          = lmv_get_lustre_md,
3100        .free_lustre_md         = lmv_free_lustre_md,
3101        .merge_attr             = lmv_merge_attr,
3102        .set_open_replay_data   = lmv_set_open_replay_data,
3103        .clear_open_replay_data = lmv_clear_open_replay_data,
3104        .intent_getattr_async   = lmv_intent_getattr_async,
3105        .revalidate_lock        = lmv_revalidate_lock,
3106        .get_fid_from_lsm       = lmv_get_fid_from_lsm,
3107        .unpackmd               = lmv_unpackmd,
3108};
3109
3110static int __init lmv_init(void)
3111{
3112        struct lprocfs_static_vars lvars;
3113
3114        lprocfs_lmv_init_vars(&lvars);
3115
3116        return class_register_type(&lmv_obd_ops, &lmv_md_ops,
3117                                 LUSTRE_LMV_NAME, NULL);
3118}
3119
3120static void lmv_exit(void)
3121{
3122        class_unregister_type(LUSTRE_LMV_NAME);
3123}
3124
3125MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3126MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
3127MODULE_VERSION(LUSTRE_VERSION_STRING);
3128MODULE_LICENSE("GPL");
3129
3130module_init(lmv_init);
3131module_exit(lmv_exit);
3132