linux/drivers/staging/lustre/lustre/lmv/lmv_obd.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_LMV
  38#include <linux/slab.h>
  39#include <linux/module.h>
  40#include <linux/init.h>
  41#include <linux/pagemap.h>
  42#include <linux/mm.h>
  43#include <asm/div64.h>
  44#include <linux/seq_file.h>
  45#include <linux/namei.h>
  46#include <linux/uaccess.h>
  47
  48#include "../include/lustre/lustre_idl.h"
  49#include "../include/obd_support.h"
  50#include "../include/lustre_lib.h"
  51#include "../include/lustre_net.h"
  52#include "../include/obd_class.h"
  53#include "../include/lprocfs_status.h"
  54#include "../include/lustre_lite.h"
  55#include "../include/lustre_fid.h"
  56#include "../include/lustre_kernelcomm.h"
  57#include "lmv_internal.h"
  58
  59static void lmv_activate_target(struct lmv_obd *lmv,
  60                                struct lmv_tgt_desc *tgt,
  61                                int activate)
  62{
  63        if (tgt->ltd_active == activate)
  64                return;
  65
  66        tgt->ltd_active = activate;
  67        lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
  68}
  69
  70/**
  71 * Error codes:
  72 *
  73 *  -EINVAL  : UUID can't be found in the LMV's target list
  74 *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
  75 *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
  76 */
  77static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
  78                              int activate)
  79{
  80        struct lmv_tgt_desc    *uninitialized_var(tgt);
  81        struct obd_device      *obd;
  82        int                  i;
  83        int                  rc = 0;
  84
  85        CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
  86               lmv, uuid->uuid, activate);
  87
  88        spin_lock(&lmv->lmv_lock);
  89        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
  90                tgt = lmv->tgts[i];
  91                if (!tgt || !tgt->ltd_exp)
  92                        continue;
  93
  94                CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
  95                       tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
  96
  97                if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
  98                        break;
  99        }
 100
 101        if (i == lmv->desc.ld_tgt_count) {
 102                rc = -EINVAL;
 103                goto out_lmv_lock;
 104        }
 105
 106        obd = class_exp2obd(tgt->ltd_exp);
 107        if (!obd) {
 108                rc = -ENOTCONN;
 109                goto out_lmv_lock;
 110        }
 111
 112        CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
 113               obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
 114               obd->obd_type->typ_name, i);
 115        LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
 116
 117        if (tgt->ltd_active == activate) {
 118                CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
 119                       activate ? "" : "in");
 120                goto out_lmv_lock;
 121        }
 122
 123        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
 124               activate ? "" : "in");
 125        lmv_activate_target(lmv, tgt, activate);
 126
 127 out_lmv_lock:
 128        spin_unlock(&lmv->lmv_lock);
 129        return rc;
 130}
 131
 132static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
 133{
 134        struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
 135
 136        return obd_get_uuid(lmv->tgts[0]->ltd_exp);
 137}
 138
 139static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
 140                      enum obd_notify_event ev, void *data)
 141{
 142        struct obd_connect_data *conn_data;
 143        struct lmv_obd    *lmv = &obd->u.lmv;
 144        struct obd_uuid  *uuid;
 145        int                   rc = 0;
 146
 147        if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
 148                CERROR("unexpected notification of %s %s!\n",
 149                       watched->obd_type->typ_name,
 150                       watched->obd_name);
 151                return -EINVAL;
 152        }
 153
 154        uuid = &watched->u.cli.cl_target_uuid;
 155        if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
 156                /*
 157                 * Set MDC as active before notifying the observer, so the
 158                 * observer can use the MDC normally.
 159                 */
 160                rc = lmv_set_mdc_active(lmv, uuid,
 161                                        ev == OBD_NOTIFY_ACTIVE);
 162                if (rc) {
 163                        CERROR("%sactivation of %s failed: %d\n",
 164                               ev == OBD_NOTIFY_ACTIVE ? "" : "de",
 165                               uuid->uuid, rc);
 166                        return rc;
 167                }
 168        } else if (ev == OBD_NOTIFY_OCD) {
 169                conn_data = &watched->u.cli.cl_import->imp_connect_data;
 170                /*
 171                 * XXX: Make sure that ocd_connect_flags from all targets are
 172                 * the same. Otherwise one of MDTs runs wrong version or
 173                 * something like this.  --umka
 174                 */
 175                obd->obd_self_export->exp_connect_data = *conn_data;
 176        }
 177#if 0
 178        else if (ev == OBD_NOTIFY_DISCON) {
 179                /*
 180                 * For disconnect event, flush fld cache for failout MDS case.
 181                 */
 182                fld_client_flush(&lmv->lmv_fld);
 183        }
 184#endif
 185        /*
 186         * Pass the notification up the chain.
 187         */
 188        if (obd->obd_observer)
 189                rc = obd_notify(obd->obd_observer, watched, ev, data);
 190
 191        return rc;
 192}
 193
 194/**
 195 * This is fake connect function. Its purpose is to initialize lmv and say
 196 * caller that everything is okay. Real connection will be performed later.
 197 */
 198static int lmv_connect(const struct lu_env *env,
 199                       struct obd_export **exp, struct obd_device *obd,
 200                       struct obd_uuid *cluuid, struct obd_connect_data *data,
 201                       void *localdata)
 202{
 203        struct lmv_obd  *lmv = &obd->u.lmv;
 204        struct lustre_handle  conn = { 0 };
 205        int                 rc = 0;
 206
 207        /*
 208         * We don't want to actually do the underlying connections more than
 209         * once, so keep track.
 210         */
 211        lmv->refcount++;
 212        if (lmv->refcount > 1) {
 213                *exp = NULL;
 214                return 0;
 215        }
 216
 217        rc = class_connect(&conn, obd, cluuid);
 218        if (rc) {
 219                CERROR("class_connection() returned %d\n", rc);
 220                return rc;
 221        }
 222
 223        *exp = class_conn2export(&conn);
 224        class_export_get(*exp);
 225
 226        lmv->exp = *exp;
 227        lmv->connected = 0;
 228        lmv->cluuid = *cluuid;
 229
 230        if (data)
 231                lmv->conn_data = *data;
 232
 233        lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
 234                                                    &obd->obd_kobj);
 235        /*
 236         * All real clients should perform actual connection right away, because
 237         * it is possible, that LMV will not have opportunity to connect targets
 238         * and MDC stuff will be called directly, for instance while reading
 239         * ../mdc/../kbytesfree procfs file, etc.
 240         */
 241        if (data && data->ocd_connect_flags & OBD_CONNECT_REAL)
 242                rc = lmv_check_connect(obd);
 243
 244        if (rc && lmv->lmv_tgts_kobj)
 245                kobject_put(lmv->lmv_tgts_kobj);
 246
 247        return rc;
 248}
 249
 250static void lmv_set_timeouts(struct obd_device *obd)
 251{
 252        struct lmv_tgt_desc   *tgt;
 253        struct lmv_obd  *lmv;
 254        int                 i;
 255
 256        lmv = &obd->u.lmv;
 257        if (lmv->server_timeout == 0)
 258                return;
 259
 260        if (lmv->connected == 0)
 261                return;
 262
 263        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 264                tgt = lmv->tgts[i];
 265                if (!tgt || !tgt->ltd_exp || tgt->ltd_active == 0)
 266                        continue;
 267
 268                obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
 269                                   KEY_INTERMDS, 0, NULL, NULL);
 270        }
 271}
 272
 273static int lmv_init_ea_size(struct obd_export *exp, int easize,
 274                            int def_easize, int cookiesize, int def_cookiesize)
 275{
 276        struct obd_device   *obd = exp->exp_obd;
 277        struct lmv_obd      *lmv = &obd->u.lmv;
 278        int               i;
 279        int               rc = 0;
 280        int               change = 0;
 281
 282        if (lmv->max_easize < easize) {
 283                lmv->max_easize = easize;
 284                change = 1;
 285        }
 286        if (lmv->max_def_easize < def_easize) {
 287                lmv->max_def_easize = def_easize;
 288                change = 1;
 289        }
 290        if (lmv->max_cookiesize < cookiesize) {
 291                lmv->max_cookiesize = cookiesize;
 292                change = 1;
 293        }
 294        if (lmv->max_def_cookiesize < def_cookiesize) {
 295                lmv->max_def_cookiesize = def_cookiesize;
 296                change = 1;
 297        }
 298        if (change == 0)
 299                return 0;
 300
 301        if (lmv->connected == 0)
 302                return 0;
 303
 304        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 305                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp ||
 306                    lmv->tgts[i]->ltd_active == 0) {
 307                        CWARN("%s: NULL export for %d\n", obd->obd_name, i);
 308                        continue;
 309                }
 310
 311                rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
 312                                     cookiesize, def_cookiesize);
 313                if (rc) {
 314                        CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d\n",
 315                               obd->obd_name, i, rc);
 316                        break;
 317                }
 318        }
 319        return rc;
 320}
 321
 322#define MAX_STRING_SIZE 128
 323
 324static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 325{
 326        struct lmv_obd    *lmv = &obd->u.lmv;
 327        struct obd_uuid  *cluuid = &lmv->cluuid;
 328        struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
 329        struct obd_device       *mdc_obd;
 330        struct obd_export       *mdc_exp;
 331        struct lu_fld_target     target;
 332        int                   rc;
 333
 334        mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
 335                                        &obd->obd_uuid);
 336        if (!mdc_obd) {
 337                CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
 338                return -EINVAL;
 339        }
 340
 341        CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
 342               mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
 343               tgt->ltd_uuid.uuid, obd->obd_uuid.uuid, cluuid->uuid);
 344
 345        if (!mdc_obd->obd_set_up) {
 346                CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
 347                return -EINVAL;
 348        }
 349
 350        rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
 351                         &lmv->conn_data, NULL);
 352        if (rc) {
 353                CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
 354                return rc;
 355        }
 356
 357        /*
 358         * Init fid sequence client for this mdc and add new fld target.
 359         */
 360        rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
 361        if (rc)
 362                return rc;
 363
 364        target.ft_srv = NULL;
 365        target.ft_exp = mdc_exp;
 366        target.ft_idx = tgt->ltd_idx;
 367
 368        fld_client_add_target(&lmv->lmv_fld, &target);
 369
 370        rc = obd_register_observer(mdc_obd, obd);
 371        if (rc) {
 372                obd_disconnect(mdc_exp);
 373                CERROR("target %s register_observer error %d\n",
 374                       tgt->ltd_uuid.uuid, rc);
 375                return rc;
 376        }
 377
 378        if (obd->obd_observer) {
 379                /*
 380                 * Tell the observer about the new target.
 381                 */
 382                rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
 383                                OBD_NOTIFY_ACTIVE,
 384                                (void *)(tgt - lmv->tgts[0]));
 385                if (rc) {
 386                        obd_disconnect(mdc_exp);
 387                        return rc;
 388                }
 389        }
 390
 391        tgt->ltd_active = 1;
 392        tgt->ltd_exp = mdc_exp;
 393        lmv->desc.ld_active_tgt_count++;
 394
 395        md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
 396                        lmv->max_cookiesize, lmv->max_def_cookiesize);
 397
 398        CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
 399               mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
 400               atomic_read(&obd->obd_refcount));
 401
 402        if (lmv->lmv_tgts_kobj)
 403                /* Even if we failed to create the link, that's fine */
 404                rc = sysfs_create_link(lmv->lmv_tgts_kobj, &mdc_obd->obd_kobj,
 405                                       mdc_obd->obd_name);
 406        return 0;
 407}
 408
 409static void lmv_del_target(struct lmv_obd *lmv, int index)
 410{
 411        if (!lmv->tgts[index])
 412                return;
 413
 414        kfree(lmv->tgts[index]);
 415        lmv->tgts[index] = NULL;
 416        return;
 417}
 418
 419static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 420                          __u32 index, int gen)
 421{
 422        struct lmv_obd      *lmv = &obd->u.lmv;
 423        struct lmv_tgt_desc *tgt;
 424        int               rc = 0;
 425
 426        CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
 427
 428        lmv_init_lock(lmv);
 429
 430        if (lmv->desc.ld_tgt_count == 0) {
 431                struct obd_device *mdc_obd;
 432
 433                mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
 434                                                &obd->obd_uuid);
 435                if (!mdc_obd) {
 436                        lmv_init_unlock(lmv);
 437                        CERROR("%s: Target %s not attached: rc = %d\n",
 438                               obd->obd_name, uuidp->uuid, -EINVAL);
 439                        return -EINVAL;
 440                }
 441        }
 442
 443        if ((index < lmv->tgts_size) && lmv->tgts[index]) {
 444                tgt = lmv->tgts[index];
 445                CERROR("%s: UUID %s already assigned at LOV target index %d: rc = %d\n",
 446                       obd->obd_name,
 447                       obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
 448                lmv_init_unlock(lmv);
 449                return -EEXIST;
 450        }
 451
 452        if (index >= lmv->tgts_size) {
 453                /* We need to reallocate the lmv target array. */
 454                struct lmv_tgt_desc **newtgts, **old = NULL;
 455                __u32 newsize = 1;
 456                __u32 oldsize = 0;
 457
 458                while (newsize < index + 1)
 459                        newsize <<= 1;
 460                newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
 461                if (!newtgts) {
 462                        lmv_init_unlock(lmv);
 463                        return -ENOMEM;
 464                }
 465
 466                if (lmv->tgts_size) {
 467                        memcpy(newtgts, lmv->tgts,
 468                               sizeof(*newtgts) * lmv->tgts_size);
 469                        old = lmv->tgts;
 470                        oldsize = lmv->tgts_size;
 471                }
 472
 473                lmv->tgts = newtgts;
 474                lmv->tgts_size = newsize;
 475                smp_rmb();
 476                kfree(old);
 477
 478                CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
 479                       lmv->tgts_size);
 480        }
 481
 482        tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
 483        if (!tgt) {
 484                lmv_init_unlock(lmv);
 485                return -ENOMEM;
 486        }
 487
 488        mutex_init(&tgt->ltd_fid_mutex);
 489        tgt->ltd_idx = index;
 490        tgt->ltd_uuid = *uuidp;
 491        tgt->ltd_active = 0;
 492        lmv->tgts[index] = tgt;
 493        if (index >= lmv->desc.ld_tgt_count)
 494                lmv->desc.ld_tgt_count = index + 1;
 495
 496        if (lmv->connected) {
 497                rc = lmv_connect_mdc(obd, tgt);
 498                if (rc) {
 499                        spin_lock(&lmv->lmv_lock);
 500                        lmv->desc.ld_tgt_count--;
 501                        memset(tgt, 0, sizeof(*tgt));
 502                        spin_unlock(&lmv->lmv_lock);
 503                } else {
 504                        int easize = sizeof(struct lmv_stripe_md) +
 505                                lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
 506                        lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
 507                }
 508        }
 509
 510        lmv_init_unlock(lmv);
 511        return rc;
 512}
 513
 514int lmv_check_connect(struct obd_device *obd)
 515{
 516        struct lmv_obd       *lmv = &obd->u.lmv;
 517        struct lmv_tgt_desc  *tgt;
 518        int                i;
 519        int                rc;
 520        int                easize;
 521
 522        if (lmv->connected)
 523                return 0;
 524
 525        lmv_init_lock(lmv);
 526        if (lmv->connected) {
 527                lmv_init_unlock(lmv);
 528                return 0;
 529        }
 530
 531        if (lmv->desc.ld_tgt_count == 0) {
 532                lmv_init_unlock(lmv);
 533                CERROR("%s: no targets configured.\n", obd->obd_name);
 534                return -EINVAL;
 535        }
 536
 537        CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
 538               lmv->cluuid.uuid, obd->obd_name);
 539
 540        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 541                tgt = lmv->tgts[i];
 542                if (!tgt)
 543                        continue;
 544                rc = lmv_connect_mdc(obd, tgt);
 545                if (rc)
 546                        goto out_disc;
 547        }
 548
 549        lmv_set_timeouts(obd);
 550        class_export_put(lmv->exp);
 551        lmv->connected = 1;
 552        easize = lmv_get_easize(lmv);
 553        lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
 554        lmv_init_unlock(lmv);
 555        return 0;
 556
 557 out_disc:
 558        while (i-- > 0) {
 559                int rc2;
 560
 561                tgt = lmv->tgts[i];
 562                if (!tgt)
 563                        continue;
 564                tgt->ltd_active = 0;
 565                if (tgt->ltd_exp) {
 566                        --lmv->desc.ld_active_tgt_count;
 567                        rc2 = obd_disconnect(tgt->ltd_exp);
 568                        if (rc2) {
 569                                CERROR("LMV target %s disconnect on MDC idx %d: error %d\n",
 570                                       tgt->ltd_uuid.uuid, i, rc2);
 571                        }
 572                }
 573        }
 574        class_disconnect(lmv->exp);
 575        lmv_init_unlock(lmv);
 576        return rc;
 577}
 578
 579static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 580{
 581        struct lmv_obd   *lmv = &obd->u.lmv;
 582        struct obd_device      *mdc_obd;
 583        int                  rc;
 584
 585        mdc_obd = class_exp2obd(tgt->ltd_exp);
 586
 587        if (mdc_obd) {
 588                mdc_obd->obd_force = obd->obd_force;
 589                mdc_obd->obd_fail = obd->obd_fail;
 590                mdc_obd->obd_no_recov = obd->obd_no_recov;
 591
 592                if (lmv->lmv_tgts_kobj)
 593                        sysfs_remove_link(lmv->lmv_tgts_kobj,
 594                                          mdc_obd->obd_name);
 595        }
 596
 597        rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
 598        if (rc)
 599                CERROR("Can't finalize fids factory\n");
 600
 601        CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
 602               tgt->ltd_exp->exp_obd->obd_name,
 603               tgt->ltd_exp->exp_obd->obd_uuid.uuid);
 604
 605        obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
 606        rc = obd_disconnect(tgt->ltd_exp);
 607        if (rc) {
 608                if (tgt->ltd_active) {
 609                        CERROR("Target %s disconnect error %d\n",
 610                               tgt->ltd_uuid.uuid, rc);
 611                }
 612        }
 613
 614        lmv_activate_target(lmv, tgt, 0);
 615        tgt->ltd_exp = NULL;
 616        return 0;
 617}
 618
 619static int lmv_disconnect(struct obd_export *exp)
 620{
 621        struct obd_device     *obd = class_exp2obd(exp);
 622        struct lmv_obd  *lmv = &obd->u.lmv;
 623        int                 rc;
 624        int                 i;
 625
 626        if (!lmv->tgts)
 627                goto out_local;
 628
 629        /*
 630         * Only disconnect the underlying layers on the final disconnect.
 631         */
 632        lmv->refcount--;
 633        if (lmv->refcount != 0)
 634                goto out_local;
 635
 636        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 637                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
 638                        continue;
 639
 640                lmv_disconnect_mdc(obd, lmv->tgts[i]);
 641        }
 642
 643        if (lmv->lmv_tgts_kobj)
 644                kobject_put(lmv->lmv_tgts_kobj);
 645
 646out_local:
 647        /*
 648         * This is the case when no real connection is established by
 649         * lmv_check_connect().
 650         */
 651        if (!lmv->connected)
 652                class_export_put(exp);
 653        rc = class_disconnect(exp);
 654        if (lmv->refcount == 0)
 655                lmv->connected = 0;
 656        return rc;
 657}
 658
 659static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
 660                        void __user *uarg)
 661{
 662        struct obd_device       *obddev = class_exp2obd(exp);
 663        struct lmv_obd          *lmv = &obddev->u.lmv;
 664        struct getinfo_fid2path *gf;
 665        struct lmv_tgt_desc     *tgt;
 666        struct getinfo_fid2path *remote_gf = NULL;
 667        int                     remote_gf_size = 0;
 668        int                     rc;
 669
 670        gf = (struct getinfo_fid2path *)karg;
 671        tgt = lmv_find_target(lmv, &gf->gf_fid);
 672        if (IS_ERR(tgt))
 673                return PTR_ERR(tgt);
 674
 675repeat_fid2path:
 676        rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
 677        if (rc != 0 && rc != -EREMOTE)
 678                goto out_fid2path;
 679
 680        /* If remote_gf != NULL, it means just building the
 681         * path on the remote MDT, copy this path segment to gf
 682         */
 683        if (remote_gf) {
 684                struct getinfo_fid2path *ori_gf;
 685                char *ptr;
 686
 687                ori_gf = (struct getinfo_fid2path *)karg;
 688                if (strlen(ori_gf->gf_path) +
 689                    strlen(gf->gf_path) > ori_gf->gf_pathlen) {
 690                        rc = -EOVERFLOW;
 691                        goto out_fid2path;
 692                }
 693
 694                ptr = ori_gf->gf_path;
 695
 696                memmove(ptr + strlen(gf->gf_path) + 1, ptr,
 697                        strlen(ori_gf->gf_path));
 698
 699                strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
 700                ptr += strlen(gf->gf_path);
 701                *ptr = '/';
 702        }
 703
 704        CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
 705               tgt->ltd_exp->exp_obd->obd_name,
 706               gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
 707               gf->gf_linkno);
 708
 709        if (rc == 0)
 710                goto out_fid2path;
 711
 712        /* sigh, has to go to another MDT to do path building further */
 713        if (!remote_gf) {
 714                remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
 715                remote_gf = kzalloc(remote_gf_size, GFP_NOFS);
 716                if (!remote_gf) {
 717                        rc = -ENOMEM;
 718                        goto out_fid2path;
 719                }
 720                remote_gf->gf_pathlen = PATH_MAX;
 721        }
 722
 723        if (!fid_is_sane(&gf->gf_fid)) {
 724                CERROR("%s: invalid FID "DFID": rc = %d\n",
 725                       tgt->ltd_exp->exp_obd->obd_name,
 726                       PFID(&gf->gf_fid), -EINVAL);
 727                rc = -EINVAL;
 728                goto out_fid2path;
 729        }
 730
 731        tgt = lmv_find_target(lmv, &gf->gf_fid);
 732        if (IS_ERR(tgt)) {
 733                rc = -EINVAL;
 734                goto out_fid2path;
 735        }
 736
 737        remote_gf->gf_fid = gf->gf_fid;
 738        remote_gf->gf_recno = -1;
 739        remote_gf->gf_linkno = -1;
 740        memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
 741        gf = remote_gf;
 742        goto repeat_fid2path;
 743
 744out_fid2path:
 745        kfree(remote_gf);
 746        return rc;
 747}
 748
 749static int lmv_hsm_req_count(struct lmv_obd *lmv,
 750                             const struct hsm_user_request *hur,
 751                             const struct lmv_tgt_desc *tgt_mds)
 752{
 753        int                     i, nr = 0;
 754        struct lmv_tgt_desc    *curr_tgt;
 755
 756        /* count how many requests must be sent to the given target */
 757        for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
 758                curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
 759                if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
 760                        nr++;
 761        }
 762        return nr;
 763}
 764
 765static void lmv_hsm_req_build(struct lmv_obd *lmv,
 766                              struct hsm_user_request *hur_in,
 767                              const struct lmv_tgt_desc *tgt_mds,
 768                              struct hsm_user_request *hur_out)
 769{
 770        int                     i, nr_out;
 771        struct lmv_tgt_desc    *curr_tgt;
 772
 773        /* build the hsm_user_request for the given target */
 774        hur_out->hur_request = hur_in->hur_request;
 775        nr_out = 0;
 776        for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
 777                curr_tgt = lmv_find_target(lmv,
 778                                           &hur_in->hur_user_item[i].hui_fid);
 779                if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
 780                        hur_out->hur_user_item[nr_out] =
 781                                hur_in->hur_user_item[i];
 782                        nr_out++;
 783                }
 784        }
 785        hur_out->hur_request.hr_itemcount = nr_out;
 786        memcpy(hur_data(hur_out), hur_data(hur_in),
 787               hur_in->hur_request.hr_data_len);
 788}
 789
 790static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
 791                                 struct lustre_kernelcomm *lk,
 792                                 void __user *uarg)
 793{
 794        int rc = 0;
 795        __u32 i;
 796
 797        /* unregister request (call from llapi_hsm_copytool_fini) */
 798        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 799                /* best effort: try to clean as much as possible
 800                 * (continue on error)
 801                 */
 802                obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
 803        }
 804
 805        /* Whatever the result, remove copytool from kuc groups.
 806         * Unreached coordinators will get EPIPE on next requests
 807         * and will unregister automatically.
 808         */
 809        rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
 810
 811        return rc;
 812}
 813
 814static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
 815                               struct lustre_kernelcomm *lk, void __user *uarg)
 816{
 817        struct file *filp;
 818        __u32 i, j;
 819        int err, rc = 0;
 820        bool any_set = false;
 821        struct kkuc_ct_data kcd = { 0 };
 822
 823        /* All or nothing: try to register to all MDS.
 824         * In case of failure, unregister from previous MDS,
 825         * except if it because of inactive target.
 826         */
 827        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
 828                err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
 829                if (err) {
 830                        if (lmv->tgts[i]->ltd_active) {
 831                                /* permanent error */
 832                                CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
 833                                       lmv->tgts[i]->ltd_uuid.uuid,
 834                                       i, cmd, err);
 835                                rc = err;
 836                                lk->lk_flags |= LK_FLG_STOP;
 837                                /* unregister from previous MDS */
 838                                for (j = 0; j < i; j++)
 839                                        obd_iocontrol(cmd,
 840                                                      lmv->tgts[j]->ltd_exp,
 841                                                      len, lk, uarg);
 842                                return rc;
 843                        }
 844                        /* else: transient error.
 845                         * kuc will register to the missing MDT when it is back
 846                         */
 847                } else {
 848                        any_set = true;
 849                }
 850        }
 851
 852        if (!any_set)
 853                /* no registration done: return error */
 854                return -ENOTCONN;
 855
 856        /* at least one registration done, with no failure */
 857        filp = fget(lk->lk_wfd);
 858        if (!filp)
 859                return -EBADF;
 860
 861        kcd.kcd_magic = KKUC_CT_DATA_MAGIC;
 862        kcd.kcd_uuid = lmv->cluuid;
 863        kcd.kcd_archive = lk->lk_data;
 864
 865        rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group,
 866                                   &kcd, sizeof(kcd));
 867        if (rc) {
 868                if (filp)
 869                        fput(filp);
 870        }
 871
 872        return rc;
 873}
 874
 875static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 876                         int len, void *karg, void __user *uarg)
 877{
 878        struct obd_device    *obddev = class_exp2obd(exp);
 879        struct lmv_obd       *lmv = &obddev->u.lmv;
 880        int                i = 0;
 881        int                rc = 0;
 882        int                set = 0;
 883        int                count = lmv->desc.ld_tgt_count;
 884
 885        if (count == 0)
 886                return -ENOTTY;
 887
 888        switch (cmd) {
 889        case IOC_OBD_STATFS: {
 890                struct obd_ioctl_data *data = karg;
 891                struct obd_device *mdc_obd;
 892                struct obd_statfs stat_buf = {0};
 893                __u32 index;
 894
 895                memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
 896                if (index >= count)
 897                        return -ENODEV;
 898
 899                if (!lmv->tgts[index] || lmv->tgts[index]->ltd_active == 0)
 900                        return -ENODATA;
 901
 902                mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
 903                if (!mdc_obd)
 904                        return -EINVAL;
 905
 906                /* copy UUID */
 907                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
 908                                 min((int)data->ioc_plen2,
 909                                     (int)sizeof(struct obd_uuid))))
 910                        return -EFAULT;
 911
 912                rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
 913                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
 914                                0);
 915                if (rc)
 916                        return rc;
 917                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
 918                                 min((int)data->ioc_plen1,
 919                                     (int)sizeof(stat_buf))))
 920                        return -EFAULT;
 921                break;
 922        }
 923        case OBD_IOC_QUOTACTL: {
 924                struct if_quotactl *qctl = karg;
 925                struct lmv_tgt_desc *tgt = NULL;
 926                struct obd_quotactl *oqctl;
 927
 928                if (qctl->qc_valid == QC_MDTIDX) {
 929                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
 930                                return -EINVAL;
 931
 932                        tgt = lmv->tgts[qctl->qc_idx];
 933                        if (!tgt || !tgt->ltd_exp)
 934                                return -EINVAL;
 935                } else if (qctl->qc_valid == QC_UUID) {
 936                        for (i = 0; i < count; i++) {
 937                                tgt = lmv->tgts[i];
 938                                if (!tgt)
 939                                        continue;
 940                                if (!obd_uuid_equals(&tgt->ltd_uuid,
 941                                                     &qctl->obd_uuid))
 942                                        continue;
 943
 944                                if (!tgt->ltd_exp)
 945                                        return -EINVAL;
 946
 947                                break;
 948                        }
 949                } else {
 950                        return -EINVAL;
 951                }
 952
 953                if (i >= count)
 954                        return -EAGAIN;
 955
 956                LASSERT(tgt && tgt->ltd_exp);
 957                oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
 958                if (!oqctl)
 959                        return -ENOMEM;
 960
 961                QCTL_COPY(oqctl, qctl);
 962                rc = obd_quotactl(tgt->ltd_exp, oqctl);
 963                if (rc == 0) {
 964                        QCTL_COPY(qctl, oqctl);
 965                        qctl->qc_valid = QC_MDTIDX;
 966                        qctl->obd_uuid = tgt->ltd_uuid;
 967                }
 968                kfree(oqctl);
 969                break;
 970        }
 971        case OBD_IOC_CHANGELOG_SEND:
 972        case OBD_IOC_CHANGELOG_CLEAR: {
 973                struct ioc_changelog *icc = karg;
 974
 975                if (icc->icc_mdtindex >= count)
 976                        return -ENODEV;
 977
 978                if (!lmv->tgts[icc->icc_mdtindex] ||
 979                    !lmv->tgts[icc->icc_mdtindex]->ltd_exp ||
 980                    lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
 981                        return -ENODEV;
 982                rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
 983                                   sizeof(*icc), icc, NULL);
 984                break;
 985        }
 986        case LL_IOC_GET_CONNECT_FLAGS: {
 987                if (!lmv->tgts[0])
 988                        return -ENODATA;
 989                rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
 990                break;
 991        }
 992        case OBD_IOC_FID2PATH: {
 993                rc = lmv_fid2path(exp, len, karg, uarg);
 994                break;
 995        }
 996        case LL_IOC_HSM_STATE_GET:
 997        case LL_IOC_HSM_STATE_SET:
 998        case LL_IOC_HSM_ACTION: {
 999                struct md_op_data       *op_data = karg;
1000                struct lmv_tgt_desc     *tgt;
1001
1002                tgt = lmv_find_target(lmv, &op_data->op_fid1);
1003                if (IS_ERR(tgt))
1004                        return PTR_ERR(tgt);
1005
1006                if (!tgt->ltd_exp)
1007                        return -EINVAL;
1008
1009                rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1010                break;
1011        }
1012        case LL_IOC_HSM_PROGRESS: {
1013                const struct hsm_progress_kernel *hpk = karg;
1014                struct lmv_tgt_desc     *tgt;
1015
1016                tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1017                if (IS_ERR(tgt))
1018                        return PTR_ERR(tgt);
1019                rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1020                break;
1021        }
1022        case LL_IOC_HSM_REQUEST: {
1023                struct hsm_user_request *hur = karg;
1024                struct lmv_tgt_desc     *tgt;
1025                unsigned int reqcount = hur->hur_request.hr_itemcount;
1026
1027                if (reqcount == 0)
1028                        return 0;
1029
1030                /* if the request is about a single fid
1031                 * or if there is a single MDS, no need to split
1032                 * the request.
1033                 */
1034                if (reqcount == 1 || count == 1) {
1035                        tgt = lmv_find_target(lmv,
1036                                              &hur->hur_user_item[0].hui_fid);
1037                        if (IS_ERR(tgt))
1038                                return PTR_ERR(tgt);
1039                        rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1040                } else {
1041                        /* split fid list to their respective MDS */
1042                        for (i = 0; i < count; i++) {
1043                                unsigned int            nr, reqlen;
1044                                int                     rc1;
1045                                struct hsm_user_request *req;
1046
1047                                nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1048                                if (nr == 0) /* nothing for this MDS */
1049                                        continue;
1050
1051                                /* build a request with fids for this MDS */
1052                                reqlen = offsetof(typeof(*hur),
1053                                                  hur_user_item[nr])
1054                                         + hur->hur_request.hr_data_len;
1055                                req = libcfs_kvzalloc(reqlen, GFP_NOFS);
1056                                if (!req)
1057                                        return -ENOMEM;
1058
1059                                lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1060
1061                                rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1062                                                    reqlen, req, uarg);
1063                                if (rc1 != 0 && rc == 0)
1064                                        rc = rc1;
1065                                kvfree(req);
1066                        }
1067                }
1068                break;
1069        }
1070        case LL_IOC_LOV_SWAP_LAYOUTS: {
1071                struct md_op_data       *op_data = karg;
1072                struct lmv_tgt_desc     *tgt1, *tgt2;
1073
1074                tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1075                if (IS_ERR(tgt1))
1076                        return PTR_ERR(tgt1);
1077
1078                tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1079                if (IS_ERR(tgt2))
1080                        return PTR_ERR(tgt2);
1081
1082                if (!tgt1->ltd_exp || !tgt2->ltd_exp)
1083                        return -EINVAL;
1084
1085                /* only files on same MDT can have their layouts swapped */
1086                if (tgt1->ltd_idx != tgt2->ltd_idx)
1087                        return -EPERM;
1088
1089                rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1090                break;
1091        }
1092        case LL_IOC_HSM_CT_START: {
1093                struct lustre_kernelcomm *lk = karg;
1094
1095                if (lk->lk_flags & LK_FLG_STOP)
1096                        rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1097                else
1098                        rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1099                break;
1100        }
1101        default:
1102                for (i = 0; i < count; i++) {
1103                        struct obd_device *mdc_obd;
1104                        int err;
1105
1106                        if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1107                                continue;
1108                        /* ll_umount_begin() sets force flag but for lmv, not
1109                         * mdc. Let's pass it through
1110                         */
1111                        mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1112                        mdc_obd->obd_force = obddev->obd_force;
1113                        err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1114                                            karg, uarg);
1115                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1116                                return err;
1117                        } else if (err) {
1118                                if (lmv->tgts[i]->ltd_active) {
1119                                        CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
1120                                               lmv->tgts[i]->ltd_uuid.uuid,
1121                                               i, cmd, err);
1122                                        if (!rc)
1123                                                rc = err;
1124                                }
1125                        } else
1126                                set = 1;
1127                }
1128                if (!set && !rc)
1129                        rc = -EIO;
1130        }
1131        return rc;
1132}
1133
1134/**
1135 * This is _inode_ placement policy function (not name).
1136 */
1137static int lmv_placement_policy(struct obd_device *obd,
1138                                struct md_op_data *op_data, u32 *mds)
1139{
1140        struct lmv_obd    *lmv = &obd->u.lmv;
1141
1142        LASSERT(mds);
1143
1144        if (lmv->desc.ld_tgt_count == 1) {
1145                *mds = 0;
1146                return 0;
1147        }
1148
1149        /**
1150         * If stripe_offset is provided during setdirstripe
1151         * (setdirstripe -i xx), xx MDS will be chosen.
1152         */
1153        if (op_data->op_cli_flags & CLI_SET_MEA) {
1154                struct lmv_user_md *lum;
1155
1156                lum = (struct lmv_user_md *)op_data->op_data;
1157                if (lum->lum_type == LMV_STRIPE_TYPE &&
1158                    lum->lum_stripe_offset != -1) {
1159                        if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1160                                CERROR("%s: Stripe_offset %d > MDT count %d: rc = %d\n",
1161                                       obd->obd_name,
1162                                       lum->lum_stripe_offset,
1163                                       lmv->desc.ld_tgt_count, -ERANGE);
1164                                return -ERANGE;
1165                        }
1166                        *mds = lum->lum_stripe_offset;
1167                        return 0;
1168                }
1169        }
1170
1171        /* Allocate new fid on target according to operation type and parent
1172         * home mds.
1173         */
1174        *mds = op_data->op_mds;
1175        return 0;
1176}
1177
1178int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1179{
1180        struct lmv_tgt_desc     *tgt;
1181        int                      rc;
1182
1183        tgt = lmv_get_target(lmv, mds);
1184        if (IS_ERR(tgt))
1185                return PTR_ERR(tgt);
1186
1187        /*
1188         * New seq alloc and FLD setup should be atomic. Otherwise we may find
1189         * on server that seq in new allocated fid is not yet known.
1190         */
1191        mutex_lock(&tgt->ltd_fid_mutex);
1192
1193        if (tgt->ltd_active == 0 || !tgt->ltd_exp) {
1194                rc = -ENODEV;
1195                goto out;
1196        }
1197
1198        /*
1199         * Asking underlaying tgt layer to allocate new fid.
1200         */
1201        rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1202        if (rc > 0) {
1203                LASSERT(fid_is_sane(fid));
1204                rc = 0;
1205        }
1206
1207out:
1208        mutex_unlock(&tgt->ltd_fid_mutex);
1209        return rc;
1210}
1211
1212int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1213                  struct md_op_data *op_data)
1214{
1215        struct obd_device     *obd = class_exp2obd(exp);
1216        struct lmv_obd  *lmv = &obd->u.lmv;
1217        u32                    mds = 0;
1218        int                 rc;
1219
1220        LASSERT(op_data);
1221        LASSERT(fid);
1222
1223        rc = lmv_placement_policy(obd, op_data, &mds);
1224        if (rc) {
1225                CERROR("Can't get target for allocating fid, rc %d\n",
1226                       rc);
1227                return rc;
1228        }
1229
1230        rc = __lmv_fid_alloc(lmv, fid, mds);
1231        if (rc) {
1232                CERROR("Can't alloc new fid, rc %d\n", rc);
1233                return rc;
1234        }
1235
1236        return rc;
1237}
1238
1239static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1240{
1241        struct lmv_obd       *lmv = &obd->u.lmv;
1242        struct lprocfs_static_vars  lvars = { NULL };
1243        struct lmv_desc     *desc;
1244        int                      rc;
1245
1246        if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1247                CERROR("LMV setup requires a descriptor\n");
1248                return -EINVAL;
1249        }
1250
1251        desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1252        if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1253                CERROR("Lmv descriptor size wrong: %d > %d\n",
1254                       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1255                return -EINVAL;
1256        }
1257
1258        lmv->tgts = kcalloc(32, sizeof(*lmv->tgts), GFP_NOFS);
1259        if (!lmv->tgts)
1260                return -ENOMEM;
1261        lmv->tgts_size = 32;
1262
1263        obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1264        lmv->desc.ld_tgt_count = 0;
1265        lmv->desc.ld_active_tgt_count = 0;
1266        lmv->max_cookiesize = 0;
1267        lmv->max_def_easize = 0;
1268        lmv->max_easize = 0;
1269        lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1270
1271        spin_lock_init(&lmv->lmv_lock);
1272        mutex_init(&lmv->init_mutex);
1273
1274        lprocfs_lmv_init_vars(&lvars);
1275
1276        lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
1277        rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
1278                                 0444, &lmv_proc_target_fops, obd);
1279        if (rc)
1280                CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1281                      obd->obd_name, rc);
1282        rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1283                             LUSTRE_CLI_FLD_HASH_DHT);
1284        if (rc) {
1285                CERROR("Can't init FLD, err %d\n", rc);
1286                goto out;
1287        }
1288
1289        return 0;
1290
1291out:
1292        return rc;
1293}
1294
1295static int lmv_cleanup(struct obd_device *obd)
1296{
1297        struct lmv_obd   *lmv = &obd->u.lmv;
1298
1299        fld_client_fini(&lmv->lmv_fld);
1300        if (lmv->tgts) {
1301                int i;
1302
1303                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1304                        if (!lmv->tgts[i])
1305                                continue;
1306                        lmv_del_target(lmv, i);
1307                }
1308                kfree(lmv->tgts);
1309                lmv->tgts_size = 0;
1310        }
1311        return 0;
1312}
1313
1314static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1315{
1316        struct lustre_cfg       *lcfg = buf;
1317        struct obd_uuid         obd_uuid;
1318        int                     gen;
1319        __u32                   index;
1320        int                     rc;
1321
1322        switch (lcfg->lcfg_command) {
1323        case LCFG_ADD_MDC:
1324                /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1325                 * 2:0  3:1  4:lustre-MDT0000-mdc_UUID
1326                 */
1327                if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
1328                        rc = -EINVAL;
1329                        goto out;
1330                }
1331
1332                obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1333
1334                if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1335                        rc = -EINVAL;
1336                        goto out;
1337                }
1338                if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1) {
1339                        rc = -EINVAL;
1340                        goto out;
1341                }
1342                rc = lmv_add_target(obd, &obd_uuid, index, gen);
1343                goto out;
1344        default:
1345                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1346                rc = -EINVAL;
1347                goto out;
1348        }
1349out:
1350        return rc;
1351}
1352
1353static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1354                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1355{
1356        struct obd_device     *obd = class_exp2obd(exp);
1357        struct lmv_obd  *lmv = &obd->u.lmv;
1358        struct obd_statfs     *temp;
1359        int                 rc = 0;
1360        int                 i;
1361
1362        rc = lmv_check_connect(obd);
1363        if (rc)
1364                return rc;
1365
1366        temp = kzalloc(sizeof(*temp), GFP_NOFS);
1367        if (!temp)
1368                return -ENOMEM;
1369
1370        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1371                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1372                        continue;
1373
1374                rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1375                                max_age, flags);
1376                if (rc) {
1377                        CERROR("can't stat MDS #%d (%s), error %d\n", i,
1378                               lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1379                               rc);
1380                        goto out_free_temp;
1381                }
1382
1383                if (i == 0) {
1384                        *osfs = *temp;
1385                        /* If the statfs is from mount, it will needs
1386                         * retrieve necessary information from MDT0.
1387                         * i.e. mount does not need the merged osfs
1388                         * from all of MDT.
1389                         * And also clients can be mounted as long as
1390                         * MDT0 is in service
1391                         */
1392                        if (flags & OBD_STATFS_FOR_MDT0)
1393                                goto out_free_temp;
1394                } else {
1395                        osfs->os_bavail += temp->os_bavail;
1396                        osfs->os_blocks += temp->os_blocks;
1397                        osfs->os_ffree += temp->os_ffree;
1398                        osfs->os_files += temp->os_files;
1399                }
1400        }
1401
1402out_free_temp:
1403        kfree(temp);
1404        return rc;
1405}
1406
1407static int lmv_getstatus(struct obd_export *exp,
1408                         struct lu_fid *fid)
1409{
1410        struct obd_device    *obd = exp->exp_obd;
1411        struct lmv_obd       *lmv = &obd->u.lmv;
1412        int                rc;
1413
1414        rc = lmv_check_connect(obd);
1415        if (rc)
1416                return rc;
1417
1418        rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid);
1419        return rc;
1420}
1421
1422static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1423                        u64 valid, const char *name,
1424                        const char *input, int input_size, int output_size,
1425                        int flags, struct ptlrpc_request **request)
1426{
1427        struct obd_device      *obd = exp->exp_obd;
1428        struct lmv_obd   *lmv = &obd->u.lmv;
1429        struct lmv_tgt_desc    *tgt;
1430        int                  rc;
1431
1432        rc = lmv_check_connect(obd);
1433        if (rc)
1434                return rc;
1435
1436        tgt = lmv_find_target(lmv, fid);
1437        if (IS_ERR(tgt))
1438                return PTR_ERR(tgt);
1439
1440        rc = md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1441                         input_size, output_size, flags, request);
1442
1443        return rc;
1444}
1445
1446static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1447                        u64 valid, const char *name,
1448                        const char *input, int input_size, int output_size,
1449                        int flags, __u32 suppgid,
1450                        struct ptlrpc_request **request)
1451{
1452        struct obd_device      *obd = exp->exp_obd;
1453        struct lmv_obd   *lmv = &obd->u.lmv;
1454        struct lmv_tgt_desc    *tgt;
1455        int                  rc;
1456
1457        rc = lmv_check_connect(obd);
1458        if (rc)
1459                return rc;
1460
1461        tgt = lmv_find_target(lmv, fid);
1462        if (IS_ERR(tgt))
1463                return PTR_ERR(tgt);
1464
1465        rc = md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1466                         input_size, output_size, flags, suppgid,
1467                         request);
1468
1469        return rc;
1470}
1471
1472static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1473                       struct ptlrpc_request **request)
1474{
1475        struct obd_device       *obd = exp->exp_obd;
1476        struct lmv_obd    *lmv = &obd->u.lmv;
1477        struct lmv_tgt_desc     *tgt;
1478        int                   rc;
1479
1480        rc = lmv_check_connect(obd);
1481        if (rc)
1482                return rc;
1483
1484        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1485        if (IS_ERR(tgt))
1486                return PTR_ERR(tgt);
1487
1488        if (op_data->op_flags & MF_GET_MDT_IDX) {
1489                op_data->op_mds = tgt->ltd_idx;
1490                return 0;
1491        }
1492
1493        rc = md_getattr(tgt->ltd_exp, op_data, request);
1494
1495        return rc;
1496}
1497
1498static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1499{
1500        struct obd_device   *obd = exp->exp_obd;
1501        struct lmv_obd      *lmv = &obd->u.lmv;
1502        int               i;
1503        int               rc;
1504
1505        rc = lmv_check_connect(obd);
1506        if (rc)
1507                return rc;
1508
1509        CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1510
1511        /*
1512         * With DNE every object can have two locks in different namespaces:
1513         * lookup lock in space of MDT storing direntry and update/open lock in
1514         * space of MDT storing inode.
1515         */
1516        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1517                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1518                        continue;
1519                md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1520        }
1521
1522        return 0;
1523}
1524
1525static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1526                           ldlm_iterator_t it, void *data)
1527{
1528        struct obd_device   *obd = exp->exp_obd;
1529        struct lmv_obd      *lmv = &obd->u.lmv;
1530        int               i;
1531        int               rc;
1532
1533        rc = lmv_check_connect(obd);
1534        if (rc)
1535                return rc;
1536
1537        CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1538
1539        /*
1540         * With DNE every object can have two locks in different namespaces:
1541         * lookup lock in space of MDT storing direntry and update/open lock in
1542         * space of MDT storing inode.
1543         */
1544        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1545                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1546                        continue;
1547                rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1548                if (rc)
1549                        return rc;
1550        }
1551
1552        return rc;
1553}
1554
1555static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1556                     struct md_open_data *mod, struct ptlrpc_request **request)
1557{
1558        struct obd_device     *obd = exp->exp_obd;
1559        struct lmv_obd  *lmv = &obd->u.lmv;
1560        struct lmv_tgt_desc   *tgt;
1561        int                 rc;
1562
1563        rc = lmv_check_connect(obd);
1564        if (rc)
1565                return rc;
1566
1567        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1568        if (IS_ERR(tgt))
1569                return PTR_ERR(tgt);
1570
1571        CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1572        rc = md_close(tgt->ltd_exp, op_data, mod, request);
1573        return rc;
1574}
1575
1576struct lmv_tgt_desc
1577*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1578                struct lu_fid *fid)
1579{
1580        struct lmv_tgt_desc *tgt;
1581
1582        tgt = lmv_find_target(lmv, fid);
1583        if (IS_ERR(tgt))
1584                return tgt;
1585
1586        op_data->op_mds = tgt->ltd_idx;
1587
1588        return tgt;
1589}
1590
1591static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1592                      const void *data, int datalen, int mode, __u32 uid,
1593                      __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1594                      struct ptlrpc_request **request)
1595{
1596        struct obd_device       *obd = exp->exp_obd;
1597        struct lmv_obd    *lmv = &obd->u.lmv;
1598        struct lmv_tgt_desc     *tgt;
1599        int                   rc;
1600
1601        rc = lmv_check_connect(obd);
1602        if (rc)
1603                return rc;
1604
1605        if (!lmv->desc.ld_active_tgt_count)
1606                return -EIO;
1607
1608        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1609        if (IS_ERR(tgt))
1610                return PTR_ERR(tgt);
1611
1612        rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1613        if (rc)
1614                return rc;
1615
1616        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1617               op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1618               op_data->op_mds);
1619
1620        op_data->op_flags |= MF_MDC_CANCEL_FID1;
1621        rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1622                       cap_effective, rdev, request);
1623
1624        if (rc == 0) {
1625                if (!*request)
1626                        return rc;
1627                CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1628        }
1629        return rc;
1630}
1631
1632static int lmv_done_writing(struct obd_export *exp,
1633                            struct md_op_data *op_data,
1634                            struct md_open_data *mod)
1635{
1636        struct obd_device     *obd = exp->exp_obd;
1637        struct lmv_obd  *lmv = &obd->u.lmv;
1638        struct lmv_tgt_desc   *tgt;
1639        int                 rc;
1640
1641        rc = lmv_check_connect(obd);
1642        if (rc)
1643                return rc;
1644
1645        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1646        if (IS_ERR(tgt))
1647                return PTR_ERR(tgt);
1648
1649        rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1650        return rc;
1651}
1652
1653static int
1654lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1655                   struct lookup_intent *it, struct md_op_data *op_data,
1656                   struct lustre_handle *lockh, void *lmm, int lmmsize,
1657                   __u64 extra_lock_flags)
1658{
1659        struct ptlrpc_request      *req = it->d.lustre.it_data;
1660        struct obd_device         *obd = exp->exp_obd;
1661        struct lmv_obd       *lmv = &obd->u.lmv;
1662        struct lustre_handle    plock;
1663        struct lmv_tgt_desc     *tgt;
1664        struct md_op_data         *rdata;
1665        struct lu_fid          fid1;
1666        struct mdt_body     *body;
1667        int                      rc = 0;
1668        int                      pmode;
1669
1670        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1671
1672        if (!(body->valid & OBD_MD_MDS))
1673                return 0;
1674
1675        CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1676               LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1677
1678        /*
1679         * We got LOOKUP lock, but we really need attrs.
1680         */
1681        pmode = it->d.lustre.it_lock_mode;
1682        LASSERT(pmode != 0);
1683        memcpy(&plock, lockh, sizeof(plock));
1684        it->d.lustre.it_lock_mode = 0;
1685        it->d.lustre.it_data = NULL;
1686        fid1 = body->fid1;
1687
1688        ptlrpc_req_finished(req);
1689
1690        tgt = lmv_find_target(lmv, &fid1);
1691        if (IS_ERR(tgt)) {
1692                rc = PTR_ERR(tgt);
1693                goto out;
1694        }
1695
1696        rdata = kzalloc(sizeof(*rdata), GFP_NOFS);
1697        if (!rdata) {
1698                rc = -ENOMEM;
1699                goto out;
1700        }
1701
1702        rdata->op_fid1 = fid1;
1703        rdata->op_bias = MDS_CROSS_REF;
1704
1705        rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1706                        lmm, lmmsize, NULL, extra_lock_flags);
1707        kfree(rdata);
1708out:
1709        ldlm_lock_decref(&plock, pmode);
1710        return rc;
1711}
1712
1713static int
1714lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1715            struct lookup_intent *it, struct md_op_data *op_data,
1716            struct lustre_handle *lockh, void *lmm, int lmmsize,
1717            struct ptlrpc_request **req, __u64 extra_lock_flags)
1718{
1719        struct obd_device       *obd = exp->exp_obd;
1720        struct lmv_obd     *lmv = &obd->u.lmv;
1721        struct lmv_tgt_desc      *tgt;
1722        int                    rc;
1723
1724        rc = lmv_check_connect(obd);
1725        if (rc)
1726                return rc;
1727
1728        CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1729               LL_IT2STR(it), PFID(&op_data->op_fid1));
1730
1731        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1732        if (IS_ERR(tgt))
1733                return PTR_ERR(tgt);
1734
1735        CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1736               LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1737
1738        rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1739                        lmm, lmmsize, req, extra_lock_flags);
1740
1741        if (rc == 0 && it && it->it_op == IT_OPEN) {
1742                rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1743                                        lmm, lmmsize, extra_lock_flags);
1744        }
1745        return rc;
1746}
1747
1748static int
1749lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1750                 struct ptlrpc_request **request)
1751{
1752        struct ptlrpc_request   *req = NULL;
1753        struct obd_device       *obd = exp->exp_obd;
1754        struct lmv_obd    *lmv = &obd->u.lmv;
1755        struct lmv_tgt_desc     *tgt;
1756        struct mdt_body  *body;
1757        int                   rc;
1758
1759        rc = lmv_check_connect(obd);
1760        if (rc)
1761                return rc;
1762
1763        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1764        if (IS_ERR(tgt))
1765                return PTR_ERR(tgt);
1766
1767        CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1768               op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1769               tgt->ltd_idx);
1770
1771        rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1772        if (rc != 0)
1773                return rc;
1774
1775        body = req_capsule_server_get(&(*request)->rq_pill,
1776                                      &RMF_MDT_BODY);
1777
1778        if (body->valid & OBD_MD_MDS) {
1779                struct lu_fid rid = body->fid1;
1780
1781                CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1782                       PFID(&rid));
1783
1784                tgt = lmv_find_target(lmv, &rid);
1785                if (IS_ERR(tgt)) {
1786                        ptlrpc_req_finished(*request);
1787                        return PTR_ERR(tgt);
1788                }
1789
1790                op_data->op_fid1 = rid;
1791                op_data->op_valid |= OBD_MD_FLCROSSREF;
1792                op_data->op_namelen = 0;
1793                op_data->op_name = NULL;
1794                rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1795                ptlrpc_req_finished(*request);
1796                *request = req;
1797        }
1798
1799        return rc;
1800}
1801
1802#define md_op_data_fid(op_data, fl)                  \
1803        (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1804         fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1805         fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1806         fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1807         NULL)
1808
1809static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1810                            int op_tgt, enum ldlm_mode mode, int bits,
1811                            int flag)
1812{
1813        struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1814        struct obd_device      *obd = exp->exp_obd;
1815        struct lmv_obd   *lmv = &obd->u.lmv;
1816        struct lmv_tgt_desc    *tgt;
1817        ldlm_policy_data_t      policy = { {0} };
1818        int                  rc = 0;
1819
1820        if (!fid_is_sane(fid))
1821                return 0;
1822
1823        tgt = lmv_find_target(lmv, fid);
1824        if (IS_ERR(tgt))
1825                return PTR_ERR(tgt);
1826
1827        if (tgt->ltd_idx != op_tgt) {
1828                CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1829                policy.l_inodebits.bits = bits;
1830                rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1831                                      mode, LCF_ASYNC, NULL);
1832        } else {
1833                CDEBUG(D_INODE,
1834                       "EARLY_CANCEL skip operation target %d on "DFID"\n",
1835                       op_tgt, PFID(fid));
1836                op_data->op_flags |= flag;
1837                rc = 0;
1838        }
1839
1840        return rc;
1841}
1842
1843/*
1844 * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1845 * op_data->op_fid2
1846 */
1847static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1848                    struct ptlrpc_request **request)
1849{
1850        struct obd_device       *obd = exp->exp_obd;
1851        struct lmv_obd    *lmv = &obd->u.lmv;
1852        struct lmv_tgt_desc     *tgt;
1853        int                   rc;
1854
1855        rc = lmv_check_connect(obd);
1856        if (rc)
1857                return rc;
1858
1859        LASSERT(op_data->op_namelen != 0);
1860
1861        CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1862               PFID(&op_data->op_fid2), op_data->op_namelen,
1863               op_data->op_name, PFID(&op_data->op_fid1));
1864
1865        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1866        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1867        op_data->op_cap = cfs_curproc_cap_pack();
1868        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1869        if (IS_ERR(tgt))
1870                return PTR_ERR(tgt);
1871
1872        /*
1873         * Cancel UPDATE lock on child (fid1).
1874         */
1875        op_data->op_flags |= MF_MDC_CANCEL_FID2;
1876        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1877                              MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1878        if (rc != 0)
1879                return rc;
1880
1881        rc = md_link(tgt->ltd_exp, op_data, request);
1882
1883        return rc;
1884}
1885
1886static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1887                      const char *old, int oldlen, const char *new, int newlen,
1888                      struct ptlrpc_request **request)
1889{
1890        struct obd_device       *obd = exp->exp_obd;
1891        struct lmv_obd    *lmv = &obd->u.lmv;
1892        struct lmv_tgt_desc     *src_tgt;
1893        struct lmv_tgt_desc     *tgt_tgt;
1894        int                     rc;
1895
1896        LASSERT(oldlen != 0);
1897
1898        CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1899               oldlen, old, PFID(&op_data->op_fid1),
1900               newlen, new, PFID(&op_data->op_fid2));
1901
1902        rc = lmv_check_connect(obd);
1903        if (rc)
1904                return rc;
1905
1906        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1907        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1908        op_data->op_cap = cfs_curproc_cap_pack();
1909        src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1910        if (IS_ERR(src_tgt))
1911                return PTR_ERR(src_tgt);
1912
1913        tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1914        if (IS_ERR(tgt_tgt))
1915                return PTR_ERR(tgt_tgt);
1916        /*
1917         * LOOKUP lock on src child (fid3) should also be cancelled for
1918         * src_tgt in mdc_rename.
1919         */
1920        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1921
1922        /*
1923         * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1924         * own target.
1925         */
1926        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1927                              LCK_EX, MDS_INODELOCK_UPDATE,
1928                              MF_MDC_CANCEL_FID2);
1929
1930        /*
1931         * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1932         */
1933        if (rc == 0) {
1934                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1935                                      LCK_EX, MDS_INODELOCK_LOOKUP,
1936                                      MF_MDC_CANCEL_FID4);
1937        }
1938
1939        /*
1940         * Cancel all the locks on tgt child (fid4).
1941         */
1942        if (rc == 0)
1943                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1944                                      LCK_EX, MDS_INODELOCK_FULL,
1945                                      MF_MDC_CANCEL_FID4);
1946
1947        if (rc == 0)
1948                rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
1949                               new, newlen, request);
1950        return rc;
1951}
1952
1953static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1954                       void *ea, int ealen, void *ea2, int ea2len,
1955                       struct ptlrpc_request **request,
1956                       struct md_open_data **mod)
1957{
1958        struct obd_device       *obd = exp->exp_obd;
1959        struct lmv_obd    *lmv = &obd->u.lmv;
1960        struct lmv_tgt_desc     *tgt;
1961        int                   rc;
1962
1963        rc = lmv_check_connect(obd);
1964        if (rc)
1965                return rc;
1966
1967        CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
1968               PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
1969
1970        op_data->op_flags |= MF_MDC_CANCEL_FID1;
1971        tgt = lmv_find_target(lmv, &op_data->op_fid1);
1972        if (IS_ERR(tgt))
1973                return PTR_ERR(tgt);
1974
1975        rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
1976                        ea2len, request, mod);
1977
1978        return rc;
1979}
1980
1981static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1982                    struct ptlrpc_request **request)
1983{
1984        struct obd_device        *obd = exp->exp_obd;
1985        struct lmv_obd      *lmv = &obd->u.lmv;
1986        struct lmv_tgt_desc       *tgt;
1987        int                     rc;
1988
1989        rc = lmv_check_connect(obd);
1990        if (rc)
1991                return rc;
1992
1993        tgt = lmv_find_target(lmv, fid);
1994        if (IS_ERR(tgt))
1995                return PTR_ERR(tgt);
1996
1997        rc = md_sync(tgt->ltd_exp, fid, request);
1998        return rc;
1999}
2000
2001/*
2002 * Adjust a set of pages, each page containing an array of lu_dirpages,
2003 * so that each page can be used as a single logical lu_dirpage.
2004 *
2005 * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2006 * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2007 * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2008 * value is used as a cookie to request the next lu_dirpage in a
2009 * directory listing that spans multiple pages (two in this example):
2010 *   ________
2011 *  |   |
2012 * .|--------v-------   -----.
2013 * |s|e|f|p|ent|ent| ... |ent|
2014 * '--|--------------   -----'   Each CFS_PAGE contains a single
2015 *    '------.             lu_dirpage.
2016 * .---------v-------   -----.
2017 * |s|e|f|p|ent| 0 | ... | 0 |
2018 * '-----------------   -----'
2019 *
2020 * However, on hosts where the native VM page size (PAGE_SIZE) is
2021 * larger than LU_PAGE_SIZE, a single host page may contain multiple
2022 * lu_dirpages. After reading the lu_dirpages from the MDS, the
2023 * ldp_hash_end of the first lu_dirpage refers to the one immediately
2024 * after it in the same CFS_PAGE (arrows simplified for brevity, but
2025 * in general e0==s1, e1==s2, etc.):
2026 *
2027 * .--------------------   -----.
2028 * |s0|e0|f0|p|ent|ent| ... |ent|
2029 * |---v----------------   -----|
2030 * |s1|e1|f1|p|ent|ent| ... |ent|
2031 * |---v----------------   -----|  Here, each CFS_PAGE contains
2032 *           ...                 multiple lu_dirpages.
2033 * |---v----------------   -----|
2034 * |s'|e'|f'|p|ent|ent| ... |ent|
2035 * '---|----------------   -----'
2036 *     v
2037 * .----------------------------.
2038 * |    next CFS_PAGE       |
2039 *
2040 * This structure is transformed into a single logical lu_dirpage as follows:
2041 *
2042 * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2043 *   labeled 'next CFS_PAGE'.
2044 *
2045 * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2046 *   a hash collision with the next page exists.
2047 *
2048 * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2049 *   to the first entry of the next lu_dirpage.
2050 */
2051#if PAGE_SIZE > LU_PAGE_SIZE
2052static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2053{
2054        int i;
2055
2056        for (i = 0; i < ncfspgs; i++) {
2057                struct lu_dirpage       *dp = kmap(pages[i]);
2058                struct lu_dirpage       *first = dp;
2059                struct lu_dirent        *end_dirent = NULL;
2060                struct lu_dirent        *ent;
2061                __u64                   hash_end = dp->ldp_hash_end;
2062                __u32                   flags = dp->ldp_flags;
2063
2064                while (--nlupgs > 0) {
2065                        ent = lu_dirent_start(dp);
2066                        for (end_dirent = ent; ent;
2067                             end_dirent = ent, ent = lu_dirent_next(ent))
2068                                ;
2069
2070                        /* Advance dp to next lu_dirpage. */
2071                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2072
2073                        /* Check if we've reached the end of the CFS_PAGE. */
2074                        if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2075                                break;
2076
2077                        /* Save the hash and flags of this lu_dirpage. */
2078                        hash_end = dp->ldp_hash_end;
2079                        flags = dp->ldp_flags;
2080
2081                        /* Check if lu_dirpage contains no entries. */
2082                        if (!end_dirent)
2083                                break;
2084
2085                        /* Enlarge the end entry lde_reclen from 0 to
2086                         * first entry of next lu_dirpage.
2087                         */
2088                        LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2089                        end_dirent->lde_reclen =
2090                                cpu_to_le16((char *)(dp->ldp_entries) -
2091                                            (char *)end_dirent);
2092                }
2093
2094                first->ldp_hash_end = hash_end;
2095                first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2096                first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2097
2098                kunmap(pages[i]);
2099        }
2100        LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2101}
2102#else
2103#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2104#endif  /* PAGE_SIZE > LU_PAGE_SIZE */
2105
2106static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2107                        struct page **pages, struct ptlrpc_request **request)
2108{
2109        struct obd_device       *obd = exp->exp_obd;
2110        struct lmv_obd          *lmv = &obd->u.lmv;
2111        __u64                   offset = op_data->op_offset;
2112        int                     rc;
2113        int                     ncfspgs; /* pages read in PAGE_SIZE */
2114        int                     nlupgs; /* pages read in LU_PAGE_SIZE */
2115        struct lmv_tgt_desc     *tgt;
2116
2117        rc = lmv_check_connect(obd);
2118        if (rc)
2119                return rc;
2120
2121        CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n",
2122               offset, PFID(&op_data->op_fid1));
2123
2124        tgt = lmv_find_target(lmv, &op_data->op_fid1);
2125        if (IS_ERR(tgt))
2126                return PTR_ERR(tgt);
2127
2128        rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2129        if (rc != 0)
2130                return rc;
2131
2132        ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_SIZE - 1)
2133                 >> PAGE_SHIFT;
2134        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2135        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2136        LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2137
2138        CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2139               op_data->op_npages);
2140
2141        lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2142
2143        return rc;
2144}
2145
2146static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2147                      struct ptlrpc_request **request)
2148{
2149        struct obd_device       *obd = exp->exp_obd;
2150        struct lmv_obd    *lmv = &obd->u.lmv;
2151        struct lmv_tgt_desc     *tgt = NULL;
2152        struct mdt_body         *body;
2153        int                  rc;
2154
2155        rc = lmv_check_connect(obd);
2156        if (rc)
2157                return rc;
2158retry:
2159        /* Send unlink requests to the MDT where the child is located */
2160        if (likely(!fid_is_zero(&op_data->op_fid2)))
2161                tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2162        else
2163                tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2164        if (IS_ERR(tgt))
2165                return PTR_ERR(tgt);
2166
2167        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2168        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2169        op_data->op_cap = cfs_curproc_cap_pack();
2170
2171        /*
2172         * If child's fid is given, cancel unused locks for it if it is from
2173         * another export than parent.
2174         *
2175         * LOOKUP lock for child (fid3) should also be cancelled on parent
2176         * tgt_tgt in mdc_unlink().
2177         */
2178        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2179
2180        /*
2181         * Cancel FULL locks on child (fid3).
2182         */
2183        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2184                              MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2185
2186        if (rc != 0)
2187                return rc;
2188
2189        CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2190               PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2191
2192        rc = md_unlink(tgt->ltd_exp, op_data, request);
2193        if (rc != 0 && rc != -EREMOTE)
2194                return rc;
2195
2196        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2197        if (!body)
2198                return -EPROTO;
2199
2200        /* Not cross-ref case, just get out of here. */
2201        if (likely(!(body->valid & OBD_MD_MDS)))
2202                return 0;
2203
2204        CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2205               exp->exp_obd->obd_name, PFID(&body->fid1));
2206
2207        /* This is a remote object, try remote MDT, Note: it may
2208         * try more than 1 time here, Considering following case
2209         * /mnt/lustre is root on MDT0, remote1 is on MDT1
2210         * 1. Initially A does not know where remote1 is, it send
2211         *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2212         *    resend unlink RPC to MDT1 (retry 1st time).
2213         *
2214         * 2. During the unlink RPC in flight,
2215         *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2216         *    and create new remote1, but on MDT0
2217         *
2218         * 3. MDT1 get unlink RPC(from A), then do remote lock on
2219         *    /mnt/lustre, then lookup get fid of remote1, and find
2220         *    it is remote dir again, and replay -EREMOTE again.
2221         *
2222         * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2223         *
2224         * In theory, it might try unlimited time here, but it should
2225         * be very rare case.
2226         */
2227        op_data->op_fid2 = body->fid1;
2228        ptlrpc_req_finished(*request);
2229        *request = NULL;
2230
2231        goto retry;
2232}
2233
2234static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2235{
2236        struct lmv_obd *lmv = &obd->u.lmv;
2237
2238        switch (stage) {
2239        case OBD_CLEANUP_EARLY:
2240                /* XXX: here should be calling obd_precleanup() down to
2241                 * stack.
2242                 */
2243                break;
2244        case OBD_CLEANUP_EXPORTS:
2245                fld_client_debugfs_fini(&lmv->lmv_fld);
2246                lprocfs_obd_cleanup(obd);
2247                break;
2248        default:
2249                break;
2250        }
2251        return 0;
2252}
2253
2254static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2255                        __u32 keylen, void *key, __u32 *vallen, void *val,
2256                        struct lov_stripe_md *lsm)
2257{
2258        struct obd_device       *obd;
2259        struct lmv_obd    *lmv;
2260        int                   rc = 0;
2261
2262        obd = class_exp2obd(exp);
2263        if (!obd) {
2264                CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2265                       exp->exp_handle.h_cookie);
2266                return -EINVAL;
2267        }
2268
2269        lmv = &obd->u.lmv;
2270        if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2271                struct lmv_tgt_desc *tgt;
2272                int i;
2273
2274                rc = lmv_check_connect(obd);
2275                if (rc)
2276                        return rc;
2277
2278                LASSERT(*vallen == sizeof(__u32));
2279                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2280                        tgt = lmv->tgts[i];
2281                        /*
2282                         * All tgts should be connected when this gets called.
2283                         */
2284                        if (!tgt || !tgt->ltd_exp)
2285                                continue;
2286
2287                        if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2288                                          vallen, val, NULL))
2289                                return 0;
2290                }
2291                return -EINVAL;
2292        } else if (KEY_IS(KEY_MAX_EASIZE) ||
2293                   KEY_IS(KEY_DEFAULT_EASIZE) ||
2294                   KEY_IS(KEY_CONN_DATA)) {
2295                rc = lmv_check_connect(obd);
2296                if (rc)
2297                        return rc;
2298
2299                /*
2300                 * Forwarding this request to first MDS, it should know LOV
2301                 * desc.
2302                 */
2303                rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2304                                  vallen, val, NULL);
2305                if (!rc && KEY_IS(KEY_CONN_DATA))
2306                        exp->exp_connect_data = *(struct obd_connect_data *)val;
2307                return rc;
2308        } else if (KEY_IS(KEY_TGT_COUNT)) {
2309                *((int *)val) = lmv->desc.ld_tgt_count;
2310                return 0;
2311        }
2312
2313        CDEBUG(D_IOCTL, "Invalid key\n");
2314        return -EINVAL;
2315}
2316
2317static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2318                              u32 keylen, void *key, u32 vallen,
2319                              void *val, struct ptlrpc_request_set *set)
2320{
2321        struct lmv_tgt_desc    *tgt;
2322        struct obd_device      *obd;
2323        struct lmv_obd   *lmv;
2324        int rc = 0;
2325
2326        obd = class_exp2obd(exp);
2327        if (!obd) {
2328                CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2329                       exp->exp_handle.h_cookie);
2330                return -EINVAL;
2331        }
2332        lmv = &obd->u.lmv;
2333
2334        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2335                int i, err = 0;
2336
2337                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2338                        tgt = lmv->tgts[i];
2339
2340                        if (!tgt || !tgt->ltd_exp)
2341                                continue;
2342
2343                        err = obd_set_info_async(env, tgt->ltd_exp,
2344                                                 keylen, key, vallen, val, set);
2345                        if (err && rc == 0)
2346                                rc = err;
2347                }
2348
2349                return rc;
2350        }
2351
2352        return -EINVAL;
2353}
2354
2355static int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2356                      struct lov_stripe_md *lsm)
2357{
2358        struct obd_device        *obd = class_exp2obd(exp);
2359        struct lmv_obd      *lmv = &obd->u.lmv;
2360        struct lmv_stripe_md      *meap;
2361        struct lmv_stripe_md      *lsmp;
2362        int                     mea_size;
2363        int                     i;
2364
2365        mea_size = lmv_get_easize(lmv);
2366        if (!lmmp)
2367                return mea_size;
2368
2369        if (*lmmp && !lsm) {
2370                kvfree(*lmmp);
2371                *lmmp = NULL;
2372                return 0;
2373        }
2374
2375        if (!*lmmp) {
2376                *lmmp = libcfs_kvzalloc(mea_size, GFP_NOFS);
2377                if (!*lmmp)
2378                        return -ENOMEM;
2379        }
2380
2381        if (!lsm)
2382                return mea_size;
2383
2384        lsmp = (struct lmv_stripe_md *)lsm;
2385        meap = (struct lmv_stripe_md *)*lmmp;
2386
2387        if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2388            lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2389                return -EINVAL;
2390
2391        meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2392        meap->mea_count = cpu_to_le32(lsmp->mea_count);
2393        meap->mea_master = cpu_to_le32(lsmp->mea_master);
2394
2395        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2396                meap->mea_ids[i] = lsmp->mea_ids[i];
2397                fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2398        }
2399
2400        return mea_size;
2401}
2402
2403static int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2404                        struct lov_mds_md *lmm, int lmm_size)
2405{
2406        struct obd_device         *obd = class_exp2obd(exp);
2407        struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2408        struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2409        struct lmv_obd       *lmv = &obd->u.lmv;
2410        int                      mea_size;
2411        int                      i;
2412        __u32                  magic;
2413
2414        mea_size = lmv_get_easize(lmv);
2415        if (!lsmp)
2416                return mea_size;
2417
2418        if (*lsmp && !lmm) {
2419                kvfree(*tmea);
2420                *lsmp = NULL;
2421                return 0;
2422        }
2423
2424        LASSERT(mea_size == lmm_size);
2425
2426        *tmea = libcfs_kvzalloc(mea_size, GFP_NOFS);
2427        if (!*tmea)
2428                return -ENOMEM;
2429
2430        if (!lmm)
2431                return mea_size;
2432
2433        if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2434            mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2435            mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) {
2436                magic = le32_to_cpu(mea->mea_magic);
2437        } else {
2438                /*
2439                 * Old mea is not handled here.
2440                 */
2441                CERROR("Old not supportable EA is found\n");
2442                LBUG();
2443        }
2444
2445        (*tmea)->mea_magic = magic;
2446        (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2447        (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2448
2449        for (i = 0; i < (*tmea)->mea_count; i++) {
2450                (*tmea)->mea_ids[i] = mea->mea_ids[i];
2451                fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2452        }
2453        return mea_size;
2454}
2455
2456static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2457                             ldlm_policy_data_t *policy, enum ldlm_mode mode,
2458                             enum ldlm_cancel_flags flags, void *opaque)
2459{
2460        struct obd_device       *obd = exp->exp_obd;
2461        struct lmv_obd    *lmv = &obd->u.lmv;
2462        int                   rc = 0;
2463        int                   err;
2464        int                   i;
2465
2466        LASSERT(fid);
2467
2468        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2469                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp ||
2470                    lmv->tgts[i]->ltd_active == 0)
2471                        continue;
2472
2473                err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2474                                       policy, mode, flags, opaque);
2475                if (!rc)
2476                        rc = err;
2477        }
2478        return rc;
2479}
2480
2481static int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2482                             __u64 *bits)
2483{
2484        struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2485        int                   rc;
2486
2487        rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2488        return rc;
2489}
2490
2491static enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags,
2492                                     const struct lu_fid *fid,
2493                                     enum ldlm_type type,
2494                                     ldlm_policy_data_t *policy,
2495                                     enum ldlm_mode mode,
2496                                     struct lustre_handle *lockh)
2497{
2498        struct obd_device       *obd = exp->exp_obd;
2499        struct lmv_obd    *lmv = &obd->u.lmv;
2500        enum ldlm_mode        rc;
2501        int                   i;
2502
2503        CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2504
2505        /*
2506         * With CMD every object can have two locks in different namespaces:
2507         * lookup lock in space of mds storing direntry and update/open lock in
2508         * space of mds storing inode. Thus we check all targets, not only that
2509         * one fid was created in.
2510         */
2511        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2512                if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp ||
2513                    lmv->tgts[i]->ltd_active == 0)
2514                        continue;
2515
2516                rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2517                                   type, policy, mode, lockh);
2518                if (rc)
2519                        return rc;
2520        }
2521
2522        return 0;
2523}
2524
2525static int lmv_get_lustre_md(struct obd_export *exp,
2526                             struct ptlrpc_request *req,
2527                             struct obd_export *dt_exp,
2528                             struct obd_export *md_exp,
2529                             struct lustre_md *md)
2530{
2531        struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2532
2533        return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2534}
2535
2536static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2537{
2538        struct obd_device       *obd = exp->exp_obd;
2539        struct lmv_obd    *lmv = &obd->u.lmv;
2540
2541        if (md->mea)
2542                obd_free_memmd(exp, (void *)&md->mea);
2543        return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2544}
2545
2546static int lmv_set_open_replay_data(struct obd_export *exp,
2547                                    struct obd_client_handle *och,
2548                                    struct lookup_intent *it)
2549{
2550        struct obd_device       *obd = exp->exp_obd;
2551        struct lmv_obd    *lmv = &obd->u.lmv;
2552        struct lmv_tgt_desc     *tgt;
2553
2554        tgt = lmv_find_target(lmv, &och->och_fid);
2555        if (IS_ERR(tgt))
2556                return PTR_ERR(tgt);
2557
2558        return md_set_open_replay_data(tgt->ltd_exp, och, it);
2559}
2560
2561static int lmv_clear_open_replay_data(struct obd_export *exp,
2562                                      struct obd_client_handle *och)
2563{
2564        struct obd_device       *obd = exp->exp_obd;
2565        struct lmv_obd    *lmv = &obd->u.lmv;
2566        struct lmv_tgt_desc     *tgt;
2567
2568        tgt = lmv_find_target(lmv, &och->och_fid);
2569        if (IS_ERR(tgt))
2570                return PTR_ERR(tgt);
2571
2572        return md_clear_open_replay_data(tgt->ltd_exp, och);
2573}
2574
2575static int lmv_get_remote_perm(struct obd_export *exp,
2576                               const struct lu_fid *fid,
2577                               __u32 suppgid, struct ptlrpc_request **request)
2578{
2579        struct obd_device       *obd = exp->exp_obd;
2580        struct lmv_obd    *lmv = &obd->u.lmv;
2581        struct lmv_tgt_desc     *tgt;
2582        int                   rc;
2583
2584        rc = lmv_check_connect(obd);
2585        if (rc)
2586                return rc;
2587
2588        tgt = lmv_find_target(lmv, fid);
2589        if (IS_ERR(tgt))
2590                return PTR_ERR(tgt);
2591
2592        rc = md_get_remote_perm(tgt->ltd_exp, fid, suppgid, request);
2593        return rc;
2594}
2595
2596static int lmv_intent_getattr_async(struct obd_export *exp,
2597                                    struct md_enqueue_info *minfo,
2598                                    struct ldlm_enqueue_info *einfo)
2599{
2600        struct md_op_data       *op_data = &minfo->mi_data;
2601        struct obd_device       *obd = exp->exp_obd;
2602        struct lmv_obd    *lmv = &obd->u.lmv;
2603        struct lmv_tgt_desc     *tgt = NULL;
2604        int                   rc;
2605
2606        rc = lmv_check_connect(obd);
2607        if (rc)
2608                return rc;
2609
2610        tgt = lmv_find_target(lmv, &op_data->op_fid1);
2611        if (IS_ERR(tgt))
2612                return PTR_ERR(tgt);
2613
2614        rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2615        return rc;
2616}
2617
2618static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2619                               struct lu_fid *fid, __u64 *bits)
2620{
2621        struct obd_device       *obd = exp->exp_obd;
2622        struct lmv_obd    *lmv = &obd->u.lmv;
2623        struct lmv_tgt_desc     *tgt;
2624        int                   rc;
2625
2626        rc = lmv_check_connect(obd);
2627        if (rc)
2628                return rc;
2629
2630        tgt = lmv_find_target(lmv, fid);
2631        if (IS_ERR(tgt))
2632                return PTR_ERR(tgt);
2633
2634        rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2635        return rc;
2636}
2637
2638/**
2639 * For lmv, only need to send request to master MDT, and the master MDT will
2640 * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2641 * we directly fetch data from the slave MDTs.
2642 */
2643static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2644                        struct obd_quotactl *oqctl)
2645{
2646        struct obd_device   *obd = class_exp2obd(exp);
2647        struct lmv_obd      *lmv = &obd->u.lmv;
2648        struct lmv_tgt_desc *tgt = lmv->tgts[0];
2649        int               rc = 0, i;
2650        __u64           curspace, curinodes;
2651
2652        if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2653                CERROR("master lmv inactive\n");
2654                return -EIO;
2655        }
2656
2657        if (oqctl->qc_cmd != Q_GETOQUOTA) {
2658                rc = obd_quotactl(tgt->ltd_exp, oqctl);
2659                return rc;
2660        }
2661
2662        curspace = curinodes = 0;
2663        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2664                int err;
2665
2666                tgt = lmv->tgts[i];
2667
2668                if (!tgt || !tgt->ltd_exp || tgt->ltd_active == 0)
2669                        continue;
2670                if (!tgt->ltd_active) {
2671                        CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2672                        continue;
2673                }
2674
2675                err = obd_quotactl(tgt->ltd_exp, oqctl);
2676                if (err) {
2677                        CERROR("getquota on mdt %d failed. %d\n", i, err);
2678                        if (!rc)
2679                                rc = err;
2680                } else {
2681                        curspace += oqctl->qc_dqblk.dqb_curspace;
2682                        curinodes += oqctl->qc_dqblk.dqb_curinodes;
2683                }
2684        }
2685        oqctl->qc_dqblk.dqb_curspace = curspace;
2686        oqctl->qc_dqblk.dqb_curinodes = curinodes;
2687
2688        return rc;
2689}
2690
2691static int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2692                          struct obd_quotactl *oqctl)
2693{
2694        struct obd_device   *obd = class_exp2obd(exp);
2695        struct lmv_obd      *lmv = &obd->u.lmv;
2696        struct lmv_tgt_desc *tgt;
2697        int               i, rc = 0;
2698
2699        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2700                int err;
2701
2702                tgt = lmv->tgts[i];
2703                if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) {
2704                        CERROR("lmv idx %d inactive\n", i);
2705                        return -EIO;
2706                }
2707
2708                err = obd_quotacheck(tgt->ltd_exp, oqctl);
2709                if (err && !rc)
2710                        rc = err;
2711        }
2712
2713        return rc;
2714}
2715
2716static struct obd_ops lmv_obd_ops = {
2717        .owner          = THIS_MODULE,
2718        .setup          = lmv_setup,
2719        .cleanup        = lmv_cleanup,
2720        .precleanup     = lmv_precleanup,
2721        .process_config = lmv_process_config,
2722        .connect        = lmv_connect,
2723        .disconnect     = lmv_disconnect,
2724        .statfs         = lmv_statfs,
2725        .get_info       = lmv_get_info,
2726        .set_info_async = lmv_set_info_async,
2727        .packmd         = lmv_packmd,
2728        .unpackmd       = lmv_unpackmd,
2729        .notify         = lmv_notify,
2730        .get_uuid       = lmv_get_uuid,
2731        .iocontrol      = lmv_iocontrol,
2732        .quotacheck     = lmv_quotacheck,
2733        .quotactl       = lmv_quotactl
2734};
2735
2736static struct md_ops lmv_md_ops = {
2737        .getstatus              = lmv_getstatus,
2738        .null_inode             = lmv_null_inode,
2739        .find_cbdata            = lmv_find_cbdata,
2740        .close                  = lmv_close,
2741        .create                 = lmv_create,
2742        .done_writing           = lmv_done_writing,
2743        .enqueue                = lmv_enqueue,
2744        .getattr                = lmv_getattr,
2745        .getxattr               = lmv_getxattr,
2746        .getattr_name           = lmv_getattr_name,
2747        .intent_lock            = lmv_intent_lock,
2748        .link                   = lmv_link,
2749        .rename                 = lmv_rename,
2750        .setattr                = lmv_setattr,
2751        .setxattr               = lmv_setxattr,
2752        .sync                   = lmv_sync,
2753        .readpage               = lmv_readpage,
2754        .unlink                 = lmv_unlink,
2755        .init_ea_size           = lmv_init_ea_size,
2756        .cancel_unused          = lmv_cancel_unused,
2757        .set_lock_data          = lmv_set_lock_data,
2758        .lock_match             = lmv_lock_match,
2759        .get_lustre_md          = lmv_get_lustre_md,
2760        .free_lustre_md         = lmv_free_lustre_md,
2761        .set_open_replay_data   = lmv_set_open_replay_data,
2762        .clear_open_replay_data = lmv_clear_open_replay_data,
2763        .get_remote_perm        = lmv_get_remote_perm,
2764        .intent_getattr_async   = lmv_intent_getattr_async,
2765        .revalidate_lock        = lmv_revalidate_lock
2766};
2767
2768static int __init lmv_init(void)
2769{
2770        struct lprocfs_static_vars lvars;
2771        int                     rc;
2772
2773        lprocfs_lmv_init_vars(&lvars);
2774
2775        rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2776                                 LUSTRE_LMV_NAME, NULL);
2777        return rc;
2778}
2779
2780static void lmv_exit(void)
2781{
2782        class_unregister_type(LUSTRE_LMV_NAME);
2783}
2784
2785MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2786MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
2787MODULE_VERSION(LUSTRE_VERSION_STRING);
2788MODULE_LICENSE("GPL");
2789
2790module_init(lmv_init);
2791module_exit(lmv_exit);
2792