linux/drivers/staging/lustre/lustre/lov/lov_obd.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/lov/lov_obd.c
  37 *
  38 * Author: Phil Schwan <phil@clusterfs.com>
  39 * Author: Peter Braam <braam@clusterfs.com>
  40 * Author: Mike Shaver <shaver@clusterfs.com>
  41 * Author: Nathan Rutman <nathan@clusterfs.com>
  42 */
  43
  44#define DEBUG_SUBSYSTEM S_LOV
  45#include "../../include/linux/libcfs/libcfs.h"
  46
  47#include "../include/obd_support.h"
  48#include "../include/lustre_lib.h"
  49#include "../include/lustre_net.h"
  50#include "../include/lustre/lustre_idl.h"
  51#include "../include/lustre_dlm.h"
  52#include "../include/lustre_mds.h"
  53#include "../include/obd_class.h"
  54#include "../include/lprocfs_status.h"
  55#include "../include/lustre_param.h"
  56#include "../include/cl_object.h"
  57#include "../include/lclient.h"         /* for cl_client_lru */
  58#include "../include/lustre/ll_fiemap.h"
  59#include "../include/lustre_fid.h"
  60
  61#include "lov_internal.h"
  62
  63/* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
  64   Any function that expects lov_tgts to remain stationary must take a ref. */
  65static void lov_getref(struct obd_device *obd)
  66{
  67        struct lov_obd *lov = &obd->u.lov;
  68
  69        /* nobody gets through here until lov_putref is done */
  70        mutex_lock(&lov->lov_lock);
  71        atomic_inc(&lov->lov_refcount);
  72        mutex_unlock(&lov->lov_lock);
  73        return;
  74}
  75
  76static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt);
  77
  78static void lov_putref(struct obd_device *obd)
  79{
  80        struct lov_obd *lov = &obd->u.lov;
  81
  82        mutex_lock(&lov->lov_lock);
  83        /* ok to dec to 0 more than once -- ltd_exp's will be null */
  84        if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
  85                LIST_HEAD(kill);
  86                int i;
  87                struct lov_tgt_desc *tgt, *n;
  88                CDEBUG(D_CONFIG, "destroying %d lov targets\n",
  89                       lov->lov_death_row);
  90                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
  91                        tgt = lov->lov_tgts[i];
  92
  93                        if (!tgt || !tgt->ltd_reap)
  94                                continue;
  95                        list_add(&tgt->ltd_kill, &kill);
  96                        /* XXX - right now there is a dependency on ld_tgt_count
  97                         * being the maximum tgt index for computing the
  98                         * mds_max_easize. So we can't shrink it. */
  99                        lov_ost_pool_remove(&lov->lov_packed, i);
 100                        lov->lov_tgts[i] = NULL;
 101                        lov->lov_death_row--;
 102                }
 103                mutex_unlock(&lov->lov_lock);
 104
 105                list_for_each_entry_safe(tgt, n, &kill, ltd_kill) {
 106                        list_del(&tgt->ltd_kill);
 107                        /* Disconnect */
 108                        __lov_del_obd(obd, tgt);
 109                }
 110        } else {
 111                mutex_unlock(&lov->lov_lock);
 112        }
 113}
 114
 115static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
 116                              enum obd_notify_event ev);
 117static int lov_notify(struct obd_device *obd, struct obd_device *watched,
 118                      enum obd_notify_event ev, void *data);
 119
 120
 121#define MAX_STRING_SIZE 128
 122int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
 123                    struct obd_connect_data *data)
 124{
 125        struct lov_obd *lov = &obd->u.lov;
 126        struct obd_uuid *tgt_uuid;
 127        struct obd_device *tgt_obd;
 128        static struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
 129        struct obd_import *imp;
 130        struct proc_dir_entry *lov_proc_dir;
 131        int rc;
 132
 133        if (!lov->lov_tgts[index])
 134                return -EINVAL;
 135
 136        tgt_uuid = &lov->lov_tgts[index]->ltd_uuid;
 137        tgt_obd = lov->lov_tgts[index]->ltd_obd;
 138
 139        if (!tgt_obd->obd_set_up) {
 140                CERROR("Target %s not set up\n", obd_uuid2str(tgt_uuid));
 141                return -EINVAL;
 142        }
 143
 144        /* override the sp_me from lov */
 145        tgt_obd->u.cli.cl_sp_me = lov->lov_sp_me;
 146
 147        if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
 148                data->ocd_index = index;
 149
 150        /*
 151         * Divine LOV knows that OBDs under it are OSCs.
 152         */
 153        imp = tgt_obd->u.cli.cl_import;
 154
 155        if (activate) {
 156                tgt_obd->obd_no_recov = 0;
 157                /* FIXME this is probably supposed to be
 158                   ptlrpc_set_import_active.  Horrible naming. */
 159                ptlrpc_activate_import(imp);
 160        }
 161
 162        rc = obd_register_observer(tgt_obd, obd);
 163        if (rc) {
 164                CERROR("Target %s register_observer error %d\n",
 165                       obd_uuid2str(tgt_uuid), rc);
 166                return rc;
 167        }
 168
 169
 170        if (imp->imp_invalid) {
 171                CDEBUG(D_CONFIG, "not connecting OSC %s; administratively disabled\n",
 172                       obd_uuid2str(tgt_uuid));
 173                return 0;
 174        }
 175
 176        rc = obd_connect(NULL, &lov->lov_tgts[index]->ltd_exp, tgt_obd,
 177                         &lov_osc_uuid, data, NULL);
 178        if (rc || !lov->lov_tgts[index]->ltd_exp) {
 179                CERROR("Target %s connect error %d\n",
 180                       obd_uuid2str(tgt_uuid), rc);
 181                return -ENODEV;
 182        }
 183
 184        lov->lov_tgts[index]->ltd_reap = 0;
 185
 186        CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
 187               obd_uuid2str(tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
 188
 189        lov_proc_dir = obd->obd_proc_private;
 190        if (lov_proc_dir) {
 191                struct obd_device *osc_obd = lov->lov_tgts[index]->ltd_exp->exp_obd;
 192                struct proc_dir_entry *osc_symlink;
 193
 194                LASSERT(osc_obd != NULL);
 195                LASSERT(osc_obd->obd_magic == OBD_DEVICE_MAGIC);
 196                LASSERT(osc_obd->obd_type->typ_name != NULL);
 197
 198                osc_symlink = lprocfs_add_symlink(osc_obd->obd_name,
 199                                                  lov_proc_dir,
 200                                                  "../../../%s/%s",
 201                                                  osc_obd->obd_type->typ_name,
 202                                                  osc_obd->obd_name);
 203                if (osc_symlink == NULL) {
 204                        CERROR("could not register LOV target /proc/fs/lustre/%s/%s/target_obds/%s.",
 205                               obd->obd_type->typ_name, obd->obd_name,
 206                               osc_obd->obd_name);
 207                        lprocfs_remove(&lov_proc_dir);
 208                        obd->obd_proc_private = NULL;
 209                }
 210        }
 211
 212        return 0;
 213}
 214
 215static int lov_connect(const struct lu_env *env,
 216                       struct obd_export **exp, struct obd_device *obd,
 217                       struct obd_uuid *cluuid, struct obd_connect_data *data,
 218                       void *localdata)
 219{
 220        struct lov_obd *lov = &obd->u.lov;
 221        struct lov_tgt_desc *tgt;
 222        struct lustre_handle conn;
 223        int i, rc;
 224
 225        CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
 226
 227        rc = class_connect(&conn, obd, cluuid);
 228        if (rc)
 229                return rc;
 230
 231        *exp = class_conn2export(&conn);
 232
 233        /* Why should there ever be more than 1 connect? */
 234        lov->lov_connects++;
 235        LASSERT(lov->lov_connects == 1);
 236
 237        memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
 238        if (data)
 239                lov->lov_ocd = *data;
 240
 241        obd_getref(obd);
 242        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 243                tgt = lov->lov_tgts[i];
 244                if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
 245                        continue;
 246                /* Flags will be lowest common denominator */
 247                rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
 248                if (rc) {
 249                        CERROR("%s: lov connect tgt %d failed: %d\n",
 250                               obd->obd_name, i, rc);
 251                        continue;
 252                }
 253                /* connect to administrative disabled ost */
 254                if (!lov->lov_tgts[i]->ltd_exp)
 255                        continue;
 256
 257                rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
 258                                OBD_NOTIFY_CONNECT, (void *)&i);
 259                if (rc) {
 260                        CERROR("%s error sending notify %d\n",
 261                               obd->obd_name, rc);
 262                }
 263        }
 264        obd_putref(obd);
 265
 266        return 0;
 267}
 268
 269static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
 270{
 271        struct proc_dir_entry *lov_proc_dir;
 272        struct lov_obd *lov = &obd->u.lov;
 273        struct obd_device *osc_obd;
 274        int rc;
 275
 276        osc_obd = class_exp2obd(tgt->ltd_exp);
 277        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
 278                obd->obd_name, osc_obd ? osc_obd->obd_name : "NULL");
 279
 280        if (tgt->ltd_active) {
 281                tgt->ltd_active = 0;
 282                lov->desc.ld_active_tgt_count--;
 283                tgt->ltd_exp->exp_obd->obd_inactive = 1;
 284        }
 285
 286        if (osc_obd) {
 287                lov_proc_dir = obd->obd_proc_private;
 288                if (lov_proc_dir) {
 289                        lprocfs_remove_proc_entry(osc_obd->obd_name, lov_proc_dir);
 290                }
 291                /* Pass it on to our clients.
 292                 * XXX This should be an argument to disconnect,
 293                 * XXX not a back-door flag on the OBD.  Ah well.
 294                 */
 295                osc_obd->obd_force = obd->obd_force;
 296                osc_obd->obd_fail = obd->obd_fail;
 297                osc_obd->obd_no_recov = obd->obd_no_recov;
 298        }
 299
 300        obd_register_observer(osc_obd, NULL);
 301
 302        rc = obd_disconnect(tgt->ltd_exp);
 303        if (rc) {
 304                CERROR("Target %s disconnect error %d\n",
 305                       tgt->ltd_uuid.uuid, rc);
 306                rc = 0;
 307        }
 308
 309        tgt->ltd_exp = NULL;
 310        return 0;
 311}
 312
 313static int lov_disconnect(struct obd_export *exp)
 314{
 315        struct obd_device *obd = class_exp2obd(exp);
 316        struct lov_obd *lov = &obd->u.lov;
 317        int i, rc;
 318
 319        if (!lov->lov_tgts)
 320                goto out;
 321
 322        /* Only disconnect the underlying layers on the final disconnect. */
 323        lov->lov_connects--;
 324        if (lov->lov_connects != 0) {
 325                /* why should there be more than 1 connect? */
 326                CERROR("disconnect #%d\n", lov->lov_connects);
 327                goto out;
 328        }
 329
 330        /* Let's hold another reference so lov_del_obd doesn't spin through
 331           putref every time */
 332        obd_getref(obd);
 333
 334        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 335                if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
 336                        /* Disconnection is the last we know about an obd */
 337                        lov_del_target(obd, i, NULL, lov->lov_tgts[i]->ltd_gen);
 338                }
 339        }
 340        obd_putref(obd);
 341
 342out:
 343        rc = class_disconnect(exp); /* bz 9811 */
 344        return rc;
 345}
 346
 347/* Error codes:
 348 *
 349 *  -EINVAL  : UUID can't be found in the LOV's target list
 350 *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
 351 *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
 352 *  any >= 0 : is log target index
 353 */
 354static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
 355                              enum obd_notify_event ev)
 356{
 357        struct lov_obd *lov = &obd->u.lov;
 358        struct lov_tgt_desc *tgt;
 359        int index, activate, active;
 360
 361        CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
 362               lov, uuid->uuid, ev);
 363
 364        obd_getref(obd);
 365        for (index = 0; index < lov->desc.ld_tgt_count; index++) {
 366                tgt = lov->lov_tgts[index];
 367                if (!tgt)
 368                        continue;
 369                /*
 370                 * LU-642, initially inactive OSC could miss the obd_connect,
 371                 * we make up for it here.
 372                 */
 373                if (ev == OBD_NOTIFY_ACTIVATE && tgt->ltd_exp == NULL &&
 374                    obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
 375                        struct obd_uuid lov_osc_uuid = {"LOV_OSC_UUID"};
 376
 377                        obd_connect(NULL, &tgt->ltd_exp, tgt->ltd_obd,
 378                                    &lov_osc_uuid, &lov->lov_ocd, NULL);
 379                }
 380                if (!tgt->ltd_exp)
 381                        continue;
 382
 383                CDEBUG(D_INFO, "lov idx %d is %s conn %#llx\n",
 384                       index, obd_uuid2str(&tgt->ltd_uuid),
 385                       tgt->ltd_exp->exp_handle.h_cookie);
 386                if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
 387                        break;
 388        }
 389
 390        if (index == lov->desc.ld_tgt_count) {
 391                index = -EINVAL;
 392                goto out;
 393        }
 394
 395        if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
 396                activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
 397
 398                if (lov->lov_tgts[index]->ltd_activate == activate) {
 399                        CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
 400                               uuid->uuid, activate ? "" : "de");
 401                } else {
 402                        lov->lov_tgts[index]->ltd_activate = activate;
 403                        CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
 404                               activate ? "" : "de", obd_uuid2str(uuid));
 405                }
 406
 407        } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
 408                active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
 409
 410                if (lov->lov_tgts[index]->ltd_active == active) {
 411                        CDEBUG(D_INFO, "OSC %s already %sactive!\n",
 412                               uuid->uuid, active ? "" : "in");
 413                        goto out;
 414                } else {
 415                        CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
 416                               obd_uuid2str(uuid), active ? "" : "in");
 417                }
 418
 419                lov->lov_tgts[index]->ltd_active = active;
 420                if (active) {
 421                        lov->desc.ld_active_tgt_count++;
 422                        lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
 423                } else {
 424                        lov->desc.ld_active_tgt_count--;
 425                        lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
 426                }
 427        } else {
 428                CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid);
 429        }
 430
 431 out:
 432        obd_putref(obd);
 433        return index;
 434}
 435
 436static int lov_notify(struct obd_device *obd, struct obd_device *watched,
 437                      enum obd_notify_event ev, void *data)
 438{
 439        int rc = 0;
 440        struct lov_obd *lov = &obd->u.lov;
 441
 442        down_read(&lov->lov_notify_lock);
 443        if (!lov->lov_connects) {
 444                up_read(&lov->lov_notify_lock);
 445                return rc;
 446        }
 447
 448        if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
 449            ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
 450                struct obd_uuid *uuid;
 451
 452                LASSERT(watched);
 453
 454                if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
 455                        up_read(&lov->lov_notify_lock);
 456                        CERROR("unexpected notification of %s %s!\n",
 457                               watched->obd_type->typ_name,
 458                               watched->obd_name);
 459                        return -EINVAL;
 460                }
 461                uuid = &watched->u.cli.cl_target_uuid;
 462
 463                /* Set OSC as active before notifying the observer, so the
 464                 * observer can use the OSC normally.
 465                 */
 466                rc = lov_set_osc_active(obd, uuid, ev);
 467                if (rc < 0) {
 468                        up_read(&lov->lov_notify_lock);
 469                        CERROR("event(%d) of %s failed: %d\n", ev,
 470                               obd_uuid2str(uuid), rc);
 471                        return rc;
 472                }
 473                /* active event should be pass lov target index as data */
 474                data = &rc;
 475        }
 476
 477        /* Pass the notification up the chain. */
 478        if (watched) {
 479                rc = obd_notify_observer(obd, watched, ev, data);
 480        } else {
 481                /* NULL watched means all osc's in the lov (only for syncs) */
 482                /* sync event should be send lov idx as data */
 483                struct lov_obd *lov = &obd->u.lov;
 484                int i, is_sync;
 485
 486                data = &i;
 487                is_sync = (ev == OBD_NOTIFY_SYNC) ||
 488                          (ev == OBD_NOTIFY_SYNC_NONBLOCK);
 489
 490                obd_getref(obd);
 491                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 492                        if (!lov->lov_tgts[i])
 493                                continue;
 494
 495                        /* don't send sync event if target not
 496                         * connected/activated */
 497                        if (is_sync &&  !lov->lov_tgts[i]->ltd_active)
 498                                continue;
 499
 500                        rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd,
 501                                                 ev, data);
 502                        if (rc) {
 503                                CERROR("%s: notify %s of %s failed %d\n",
 504                                       obd->obd_name,
 505                                       obd->obd_observer->obd_name,
 506                                       lov->lov_tgts[i]->ltd_obd->obd_name,
 507                                       rc);
 508                        }
 509                }
 510                obd_putref(obd);
 511        }
 512
 513        up_read(&lov->lov_notify_lock);
 514        return rc;
 515}
 516
 517static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 518                          __u32 index, int gen, int active)
 519{
 520        struct lov_obd *lov = &obd->u.lov;
 521        struct lov_tgt_desc *tgt;
 522        struct obd_device *tgt_obd;
 523        int rc;
 524
 525        CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
 526               uuidp->uuid, index, gen, active);
 527
 528        if (gen <= 0) {
 529                CERROR("request to add OBD %s with invalid generation: %d\n",
 530                       uuidp->uuid, gen);
 531                return -EINVAL;
 532        }
 533
 534        tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME,
 535                                        &obd->obd_uuid);
 536        if (tgt_obd == NULL)
 537                return -EINVAL;
 538
 539        mutex_lock(&lov->lov_lock);
 540
 541        if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
 542                tgt = lov->lov_tgts[index];
 543                CERROR("UUID %s already assigned at LOV target index %d\n",
 544                       obd_uuid2str(&tgt->ltd_uuid), index);
 545                mutex_unlock(&lov->lov_lock);
 546                return -EEXIST;
 547        }
 548
 549        if (index >= lov->lov_tgt_size) {
 550                /* We need to reallocate the lov target array. */
 551                struct lov_tgt_desc **newtgts, **old = NULL;
 552                __u32 newsize, oldsize = 0;
 553
 554                newsize = max_t(__u32, lov->lov_tgt_size, 2);
 555                while (newsize < index + 1)
 556                        newsize <<= 1;
 557                OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
 558                if (newtgts == NULL) {
 559                        mutex_unlock(&lov->lov_lock);
 560                        return -ENOMEM;
 561                }
 562
 563                if (lov->lov_tgt_size) {
 564                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
 565                               lov->lov_tgt_size);
 566                        old = lov->lov_tgts;
 567                        oldsize = lov->lov_tgt_size;
 568                }
 569
 570                lov->lov_tgts = newtgts;
 571                lov->lov_tgt_size = newsize;
 572                smp_rmb();
 573                if (old)
 574                        OBD_FREE(old, sizeof(*old) * oldsize);
 575
 576                CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
 577                       lov->lov_tgts, lov->lov_tgt_size);
 578        }
 579
 580        OBD_ALLOC_PTR(tgt);
 581        if (!tgt) {
 582                mutex_unlock(&lov->lov_lock);
 583                return -ENOMEM;
 584        }
 585
 586        rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
 587        if (rc) {
 588                mutex_unlock(&lov->lov_lock);
 589                OBD_FREE_PTR(tgt);
 590                return rc;
 591        }
 592
 593        tgt->ltd_uuid = *uuidp;
 594        tgt->ltd_obd = tgt_obd;
 595        /* XXX - add a sanity check on the generation number. */
 596        tgt->ltd_gen = gen;
 597        tgt->ltd_index = index;
 598        tgt->ltd_activate = active;
 599        lov->lov_tgts[index] = tgt;
 600        if (index >= lov->desc.ld_tgt_count)
 601                lov->desc.ld_tgt_count = index + 1;
 602
 603        mutex_unlock(&lov->lov_lock);
 604
 605        CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
 606                index, tgt->ltd_gen, lov->desc.ld_tgt_count);
 607
 608        rc = obd_notify(obd, tgt_obd, OBD_NOTIFY_CREATE, &index);
 609
 610        if (lov->lov_connects == 0) {
 611                /* lov_connect hasn't been called yet. We'll do the
 612                   lov_connect_obd on this target when that fn first runs,
 613                   because we don't know the connect flags yet. */
 614                return 0;
 615        }
 616
 617        obd_getref(obd);
 618
 619        rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
 620        if (rc)
 621                goto out;
 622
 623        /* connect to administrative disabled ost */
 624        if (!tgt->ltd_exp) {
 625                rc = 0;
 626                goto out;
 627        }
 628
 629        if (lov->lov_cache != NULL) {
 630                rc = obd_set_info_async(NULL, tgt->ltd_exp,
 631                                sizeof(KEY_CACHE_SET), KEY_CACHE_SET,
 632                                sizeof(struct cl_client_cache), lov->lov_cache,
 633                                NULL);
 634                if (rc < 0)
 635                        goto out;
 636        }
 637
 638        rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
 639                        active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
 640                        (void *)&index);
 641
 642out:
 643        if (rc) {
 644                CERROR("add failed (%d), deleting %s\n", rc,
 645                       obd_uuid2str(&tgt->ltd_uuid));
 646                lov_del_target(obd, index, NULL, 0);
 647        }
 648        obd_putref(obd);
 649        return rc;
 650}
 651
 652/* Schedule a target for deletion */
 653int lov_del_target(struct obd_device *obd, __u32 index,
 654                   struct obd_uuid *uuidp, int gen)
 655{
 656        struct lov_obd *lov = &obd->u.lov;
 657        int count = lov->desc.ld_tgt_count;
 658        int rc = 0;
 659
 660        if (index >= count) {
 661                CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
 662                       index, count);
 663                return -EINVAL;
 664        }
 665
 666        /* to make sure there's no ongoing lov_notify() now */
 667        down_write(&lov->lov_notify_lock);
 668        obd_getref(obd);
 669
 670        if (!lov->lov_tgts[index]) {
 671                CERROR("LOV target at index %d is not setup.\n", index);
 672                rc = -EINVAL;
 673                goto out;
 674        }
 675
 676        if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
 677                CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
 678                       lov_uuid2str(lov, index), index,
 679                       obd_uuid2str(uuidp));
 680                rc = -EINVAL;
 681                goto out;
 682        }
 683
 684        CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
 685               lov_uuid2str(lov, index), index,
 686               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
 687               lov->lov_tgts[index]->ltd_active);
 688
 689        lov->lov_tgts[index]->ltd_reap = 1;
 690        lov->lov_death_row++;
 691        /* we really delete it from obd_putref */
 692out:
 693        obd_putref(obd);
 694        up_write(&lov->lov_notify_lock);
 695
 696        return rc;
 697}
 698
 699static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
 700{
 701        struct obd_device *osc_obd;
 702
 703        LASSERT(tgt);
 704        LASSERT(tgt->ltd_reap);
 705
 706        osc_obd = class_exp2obd(tgt->ltd_exp);
 707
 708        CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
 709               tgt->ltd_uuid.uuid,
 710               osc_obd ? osc_obd->obd_name : "<no obd>");
 711
 712        if (tgt->ltd_exp)
 713                lov_disconnect_obd(obd, tgt);
 714
 715        OBD_FREE_PTR(tgt);
 716
 717        /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
 718           do it ourselves. And we can't do it from lov_cleanup,
 719           because we just lost our only reference to it. */
 720        if (osc_obd)
 721                class_manual_cleanup(osc_obd);
 722}
 723
 724void lov_fix_desc_stripe_size(__u64 *val)
 725{
 726        if (*val < LOV_MIN_STRIPE_SIZE) {
 727                if (*val != 0)
 728                        LCONSOLE_INFO("Increasing default stripe size to minimum %u\n",
 729                                      LOV_DESC_STRIPE_SIZE_DEFAULT);
 730                *val = LOV_DESC_STRIPE_SIZE_DEFAULT;
 731        } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
 732                *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
 733                LCONSOLE_WARN("Changing default stripe size to %llu (a multiple of %u)\n",
 734                              *val, LOV_MIN_STRIPE_SIZE);
 735        }
 736}
 737
 738void lov_fix_desc_stripe_count(__u32 *val)
 739{
 740        if (*val == 0)
 741                *val = 1;
 742}
 743
 744void lov_fix_desc_pattern(__u32 *val)
 745{
 746        /* from lov_setstripe */
 747        if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
 748                LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
 749                *val = 0;
 750        }
 751}
 752
 753void lov_fix_desc_qos_maxage(__u32 *val)
 754{
 755        if (*val == 0)
 756                *val = LOV_DESC_QOS_MAXAGE_DEFAULT;
 757}
 758
 759void lov_fix_desc(struct lov_desc *desc)
 760{
 761        lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
 762        lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
 763        lov_fix_desc_pattern(&desc->ld_pattern);
 764        lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
 765}
 766
 767int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 768{
 769        struct lprocfs_static_vars lvars = { NULL };
 770        struct lov_desc *desc;
 771        struct lov_obd *lov = &obd->u.lov;
 772        int rc;
 773
 774        if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
 775                CERROR("LOV setup requires a descriptor\n");
 776                return -EINVAL;
 777        }
 778
 779        desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
 780
 781        if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
 782                CERROR("descriptor size wrong: %d > %d\n",
 783                       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
 784                return -EINVAL;
 785        }
 786
 787        if (desc->ld_magic != LOV_DESC_MAGIC) {
 788                if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
 789                            CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
 790                                   obd->obd_name, desc);
 791                            lustre_swab_lov_desc(desc);
 792                } else {
 793                        CERROR("%s: Bad lov desc magic: %#x\n",
 794                               obd->obd_name, desc->ld_magic);
 795                        return -EINVAL;
 796                }
 797        }
 798
 799        lov_fix_desc(desc);
 800
 801        desc->ld_active_tgt_count = 0;
 802        lov->desc = *desc;
 803        lov->lov_tgt_size = 0;
 804
 805        mutex_init(&lov->lov_lock);
 806        atomic_set(&lov->lov_refcount, 0);
 807        lov->lov_sp_me = LUSTRE_SP_CLI;
 808
 809        init_rwsem(&lov->lov_notify_lock);
 810
 811        lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS,
 812                                                   HASH_POOLS_MAX_BITS,
 813                                                   HASH_POOLS_BKT_BITS, 0,
 814                                                   CFS_HASH_MIN_THETA,
 815                                                   CFS_HASH_MAX_THETA,
 816                                                   &pool_hash_operations,
 817                                                   CFS_HASH_DEFAULT);
 818        INIT_LIST_HEAD(&lov->lov_pool_list);
 819        lov->lov_pool_count = 0;
 820        rc = lov_ost_pool_init(&lov->lov_packed, 0);
 821        if (rc)
 822                goto out;
 823
 824        lprocfs_lov_init_vars(&lvars);
 825        lprocfs_obd_setup(obd, lvars.obd_vars);
 826#if defined (CONFIG_PROC_FS)
 827        {
 828                int rc1;
 829
 830                rc1 = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
 831                                        0444, &lov_proc_target_fops, obd);
 832                if (rc1)
 833                        CWARN("Error adding the target_obd file\n");
 834        }
 835#endif
 836        lov->lov_pool_proc_entry = lprocfs_register("pools",
 837                                                    obd->obd_proc_entry,
 838                                                    NULL, NULL);
 839
 840        return 0;
 841
 842out:
 843        return rc;
 844}
 845
 846static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 847{
 848        struct lov_obd *lov = &obd->u.lov;
 849
 850        switch (stage) {
 851        case OBD_CLEANUP_EARLY: {
 852                int i;
 853                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 854                        if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
 855                                continue;
 856                        obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
 857                                       OBD_CLEANUP_EARLY);
 858                }
 859                break;
 860        }
 861        default:
 862                break;
 863        }
 864
 865        return 0;
 866}
 867
 868static int lov_cleanup(struct obd_device *obd)
 869{
 870        struct lov_obd *lov = &obd->u.lov;
 871        struct list_head *pos, *tmp;
 872        struct pool_desc *pool;
 873
 874        list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
 875                pool = list_entry(pos, struct pool_desc, pool_list);
 876                /* free pool structs */
 877                CDEBUG(D_INFO, "delete pool %p\n", pool);
 878                /* In the function below, .hs_keycmp resolves to
 879                 * pool_hashkey_keycmp() */
 880                /* coverity[overrun-buffer-val] */
 881                lov_pool_del(obd, pool->pool_name);
 882        }
 883        cfs_hash_putref(lov->lov_pools_hash_body);
 884        lov_ost_pool_free(&lov->lov_packed);
 885
 886        lprocfs_obd_cleanup(obd);
 887        if (lov->lov_tgts) {
 888                int i;
 889                obd_getref(obd);
 890                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 891                        if (!lov->lov_tgts[i])
 892                                continue;
 893
 894                        /* Inactive targets may never have connected */
 895                        if (lov->lov_tgts[i]->ltd_active ||
 896                            atomic_read(&lov->lov_refcount))
 897                            /* We should never get here - these
 898                               should have been removed in the
 899                             disconnect. */
 900                                CERROR("lov tgt %d not cleaned! deathrow=%d, lovrc=%d\n",
 901                                       i, lov->lov_death_row,
 902                                       atomic_read(&lov->lov_refcount));
 903                        lov_del_target(obd, i, NULL, 0);
 904                }
 905                obd_putref(obd);
 906                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
 907                         lov->lov_tgt_size);
 908                lov->lov_tgt_size = 0;
 909        }
 910        return 0;
 911}
 912
 913int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
 914                            __u32 *indexp, int *genp)
 915{
 916        struct obd_uuid obd_uuid;
 917        int cmd;
 918        int rc = 0;
 919
 920        switch (cmd = lcfg->lcfg_command) {
 921        case LCFG_LOV_ADD_OBD:
 922        case LCFG_LOV_ADD_INA:
 923        case LCFG_LOV_DEL_OBD: {
 924                __u32 index;
 925                int gen;
 926                /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
 927                if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
 928                        rc = -EINVAL;
 929                        goto out;
 930                }
 931
 932                obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
 933
 934                if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", indexp) != 1) {
 935                        rc = -EINVAL;
 936                        goto out;
 937                }
 938                if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", genp) != 1) {
 939                        rc = -EINVAL;
 940                        goto out;
 941                }
 942                index = *indexp;
 943                gen = *genp;
 944                if (cmd == LCFG_LOV_ADD_OBD)
 945                        rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
 946                else if (cmd == LCFG_LOV_ADD_INA)
 947                        rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
 948                else
 949                        rc = lov_del_target(obd, index, &obd_uuid, gen);
 950                goto out;
 951        }
 952        case LCFG_PARAM: {
 953                struct lprocfs_static_vars lvars = { NULL };
 954                struct lov_desc *desc = &(obd->u.lov.desc);
 955
 956                if (!desc) {
 957                        rc = -EINVAL;
 958                        goto out;
 959                }
 960
 961                lprocfs_lov_init_vars(&lvars);
 962
 963                rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
 964                                              lcfg, obd);
 965                if (rc > 0)
 966                        rc = 0;
 967                goto out;
 968        }
 969        case LCFG_POOL_NEW:
 970        case LCFG_POOL_ADD:
 971        case LCFG_POOL_DEL:
 972        case LCFG_POOL_REM:
 973                goto out;
 974
 975        default: {
 976                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
 977                rc = -EINVAL;
 978                goto out;
 979
 980        }
 981        }
 982out:
 983        return rc;
 984}
 985
 986static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
 987                        struct lov_stripe_md **ea, struct obd_trans_info *oti)
 988{
 989        struct lov_stripe_md *obj_mdp, *lsm;
 990        struct lov_obd *lov = &exp->exp_obd->u.lov;
 991        unsigned ost_idx;
 992        int rc, i;
 993
 994        LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
 995                src_oa->o_flags & OBD_FL_RECREATE_OBJS);
 996
 997        OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
 998        if (obj_mdp == NULL)
 999                return -ENOMEM;
1000
1001        ost_idx = src_oa->o_nlink;
1002        lsm = *ea;
1003        if (lsm == NULL) {
1004                rc = -EINVAL;
1005                goto out;
1006        }
1007        if (ost_idx >= lov->desc.ld_tgt_count ||
1008            !lov->lov_tgts[ost_idx]) {
1009                rc = -EINVAL;
1010                goto out;
1011        }
1012
1013        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1014                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1015
1016                if (lov_oinfo_is_dummy(loi))
1017                        continue;
1018
1019                if (loi->loi_ost_idx == ost_idx) {
1020                        if (ostid_id(&loi->loi_oi) != ostid_id(&src_oa->o_oi)) {
1021                                rc = -EINVAL;
1022                                goto out;
1023                        }
1024                        break;
1025                }
1026        }
1027        if (i == lsm->lsm_stripe_count) {
1028                rc = -EINVAL;
1029                goto out;
1030        }
1031
1032        rc = obd_create(NULL, lov->lov_tgts[ost_idx]->ltd_exp,
1033                        src_oa, &obj_mdp, oti);
1034out:
1035        OBD_FREE(obj_mdp, sizeof(*obj_mdp));
1036        return rc;
1037}
1038
1039/* the LOV expects oa->o_id to be set to the LOV object id */
1040static int lov_create(const struct lu_env *env, struct obd_export *exp,
1041                      struct obdo *src_oa, struct lov_stripe_md **ea,
1042                      struct obd_trans_info *oti)
1043{
1044        struct lov_obd *lov;
1045        int rc = 0;
1046
1047        LASSERT(ea != NULL);
1048        if (exp == NULL)
1049                return -EINVAL;
1050
1051        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1052            src_oa->o_flags == OBD_FL_DELORPHAN) {
1053                /* should be used with LOV anymore */
1054                LBUG();
1055        }
1056
1057        lov = &exp->exp_obd->u.lov;
1058        if (!lov->desc.ld_active_tgt_count)
1059                return -EIO;
1060
1061        obd_getref(exp->exp_obd);
1062        /* Recreate a specific object id at the given OST index */
1063        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1064            (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1065                 rc = lov_recreate(exp, src_oa, ea, oti);
1066        }
1067
1068        obd_putref(exp->exp_obd);
1069        return rc;
1070}
1071
1072#define ASSERT_LSM_MAGIC(lsmp)                                            \
1073do {                                                                        \
1074        LASSERT((lsmp) != NULL);                                                \
1075        LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC_V1 ||                    \
1076                 (lsmp)->lsm_magic == LOV_MAGIC_V3),                        \
1077                 "%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic);            \
1078} while (0)
1079
1080static int lov_destroy(const struct lu_env *env, struct obd_export *exp,
1081                       struct obdo *oa, struct lov_stripe_md *lsm,
1082                       struct obd_trans_info *oti, struct obd_export *md_exp,
1083                       void *capa)
1084{
1085        struct lov_request_set *set;
1086        struct obd_info oinfo;
1087        struct lov_request *req;
1088        struct list_head *pos;
1089        struct lov_obd *lov;
1090        int rc = 0, err = 0;
1091
1092        ASSERT_LSM_MAGIC(lsm);
1093
1094        if (!exp || !exp->exp_obd)
1095                return -ENODEV;
1096
1097        if (oa->o_valid & OBD_MD_FLCOOKIE) {
1098                LASSERT(oti);
1099                LASSERT(oti->oti_logcookies);
1100        }
1101
1102        lov = &exp->exp_obd->u.lov;
1103        obd_getref(exp->exp_obd);
1104        rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1105        if (rc)
1106                goto out;
1107
1108        list_for_each(pos, &set->set_list) {
1109                req = list_entry(pos, struct lov_request, rq_link);
1110
1111                if (oa->o_valid & OBD_MD_FLCOOKIE)
1112                        oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1113
1114                err = obd_destroy(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1115                                  req->rq_oi.oi_oa, NULL, oti, NULL, capa);
1116                err = lov_update_common_set(set, req, err);
1117                if (err) {
1118                        CERROR("%s: destroying objid "DOSTID" subobj "
1119                               DOSTID" on OST idx %d: rc = %d\n",
1120                               exp->exp_obd->obd_name, POSTID(&oa->o_oi),
1121                               POSTID(&req->rq_oi.oi_oa->o_oi),
1122                               req->rq_idx, err);
1123                        if (!rc)
1124                                rc = err;
1125                }
1126        }
1127
1128        if (rc == 0) {
1129                LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
1130                rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1131        }
1132        err = lov_fini_destroy_set(set);
1133out:
1134        obd_putref(exp->exp_obd);
1135        return rc ? rc : err;
1136}
1137
1138static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1139                                 void *data, int rc)
1140{
1141        struct lov_request_set *lovset = (struct lov_request_set *)data;
1142        int err;
1143
1144        /* don't do attribute merge if this async op failed */
1145        if (rc)
1146                atomic_set(&lovset->set_completes, 0);
1147        err = lov_fini_getattr_set(lovset);
1148        return rc ? rc : err;
1149}
1150
1151static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1152                              struct ptlrpc_request_set *rqset)
1153{
1154        struct lov_request_set *lovset;
1155        struct lov_obd *lov;
1156        struct list_head *pos;
1157        struct lov_request *req;
1158        int rc = 0, err;
1159
1160        LASSERT(oinfo);
1161        ASSERT_LSM_MAGIC(oinfo->oi_md);
1162
1163        if (!exp || !exp->exp_obd)
1164                return -ENODEV;
1165
1166        lov = &exp->exp_obd->u.lov;
1167
1168        rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1169        if (rc)
1170                return rc;
1171
1172        CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1173               POSTID(&oinfo->oi_md->lsm_oi), oinfo->oi_md->lsm_stripe_count,
1174               oinfo->oi_md->lsm_stripe_size);
1175
1176        list_for_each(pos, &lovset->set_list) {
1177                req = list_entry(pos, struct lov_request, rq_link);
1178
1179                CDEBUG(D_INFO, "objid " DOSTID "[%d] has subobj " DOSTID " at idx%u\n",
1180                       POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1181                       POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1182                rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1183                                       &req->rq_oi, rqset);
1184                if (rc) {
1185                        CERROR("%s: getattr objid "DOSTID" subobj"
1186                               DOSTID" on OST idx %d: rc = %d\n",
1187                               exp->exp_obd->obd_name,
1188                               POSTID(&oinfo->oi_oa->o_oi),
1189                               POSTID(&req->rq_oi.oi_oa->o_oi),
1190                               req->rq_idx, rc);
1191                        goto out;
1192                }
1193        }
1194
1195        if (!list_empty(&rqset->set_requests)) {
1196                LASSERT(rc == 0);
1197                LASSERT(rqset->set_interpret == NULL);
1198                rqset->set_interpret = lov_getattr_interpret;
1199                rqset->set_arg = (void *)lovset;
1200                return rc;
1201        }
1202out:
1203        if (rc)
1204                atomic_set(&lovset->set_completes, 0);
1205        err = lov_fini_getattr_set(lovset);
1206        return rc ? rc : err;
1207}
1208
1209static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1210                                 void *data, int rc)
1211{
1212        struct lov_request_set *lovset = (struct lov_request_set *)data;
1213        int err;
1214
1215        if (rc)
1216                atomic_set(&lovset->set_completes, 0);
1217        err = lov_fini_setattr_set(lovset);
1218        return rc ? rc : err;
1219}
1220
1221/* If @oti is given, the request goes from MDS and responses from OSTs are not
1222   needed. Otherwise, a client is waiting for responses. */
1223static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1224                             struct obd_trans_info *oti,
1225                             struct ptlrpc_request_set *rqset)
1226{
1227        struct lov_request_set *set;
1228        struct lov_request *req;
1229        struct list_head *pos;
1230        struct lov_obd *lov;
1231        int rc = 0;
1232
1233        LASSERT(oinfo);
1234        ASSERT_LSM_MAGIC(oinfo->oi_md);
1235        if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1236                LASSERT(oti);
1237                LASSERT(oti->oti_logcookies);
1238        }
1239
1240        if (!exp || !exp->exp_obd)
1241                return -ENODEV;
1242
1243        lov = &exp->exp_obd->u.lov;
1244        rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1245        if (rc)
1246                return rc;
1247
1248        CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1249               POSTID(&oinfo->oi_md->lsm_oi),
1250               oinfo->oi_md->lsm_stripe_count,
1251               oinfo->oi_md->lsm_stripe_size);
1252
1253        list_for_each(pos, &set->set_list) {
1254                req = list_entry(pos, struct lov_request, rq_link);
1255
1256                if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1257                        oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1258
1259                CDEBUG(D_INFO, "objid " DOSTID "[%d] has subobj " DOSTID " at idx%u\n",
1260                       POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1261                       POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1262
1263                rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1264                                       &req->rq_oi, oti, rqset);
1265                if (rc) {
1266                        CERROR("error: setattr objid "DOSTID" subobj"
1267                               DOSTID" on OST idx %d: rc = %d\n",
1268                               POSTID(&set->set_oi->oi_oa->o_oi),
1269                               POSTID(&req->rq_oi.oi_oa->o_oi),
1270                               req->rq_idx, rc);
1271                        break;
1272                }
1273        }
1274
1275        /* If we are not waiting for responses on async requests, return. */
1276        if (rc || !rqset || list_empty(&rqset->set_requests)) {
1277                int err;
1278                if (rc)
1279                        atomic_set(&set->set_completes, 0);
1280                err = lov_fini_setattr_set(set);
1281                return rc ? rc : err;
1282        }
1283
1284        LASSERT(rqset->set_interpret == NULL);
1285        rqset->set_interpret = lov_setattr_interpret;
1286        rqset->set_arg = (void *)set;
1287
1288        return 0;
1289}
1290
1291/* find any ldlm lock of the inode in lov
1292 * return 0    not find
1293 *      1    find one
1294 *      < 0    error */
1295static int lov_find_cbdata(struct obd_export *exp,
1296                           struct lov_stripe_md *lsm, ldlm_iterator_t it,
1297                           void *data)
1298{
1299        struct lov_obd *lov;
1300        int rc = 0, i;
1301
1302        ASSERT_LSM_MAGIC(lsm);
1303
1304        if (!exp || !exp->exp_obd)
1305                return -ENODEV;
1306
1307        lov = &exp->exp_obd->u.lov;
1308        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1309                struct lov_stripe_md submd;
1310                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1311
1312                if (lov_oinfo_is_dummy(loi))
1313                        continue;
1314
1315                if (!lov->lov_tgts[loi->loi_ost_idx]) {
1316                        CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
1317                        continue;
1318                }
1319
1320                submd.lsm_oi = loi->loi_oi;
1321                submd.lsm_stripe_count = 0;
1322                rc = obd_find_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1323                                     &submd, it, data);
1324                if (rc != 0)
1325                        return rc;
1326        }
1327        return rc;
1328}
1329
1330int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
1331{
1332        struct lov_request_set *lovset = (struct lov_request_set *)data;
1333        int err;
1334
1335        if (rc)
1336                atomic_set(&lovset->set_completes, 0);
1337
1338        err = lov_fini_statfs_set(lovset);
1339        return rc ? rc : err;
1340}
1341
1342static int lov_statfs_async(struct obd_export *exp, struct obd_info *oinfo,
1343                            __u64 max_age, struct ptlrpc_request_set *rqset)
1344{
1345        struct obd_device      *obd = class_exp2obd(exp);
1346        struct lov_request_set *set;
1347        struct lov_request *req;
1348        struct list_head *pos;
1349        struct lov_obd *lov;
1350        int rc = 0;
1351
1352        LASSERT(oinfo != NULL);
1353        LASSERT(oinfo->oi_osfs != NULL);
1354
1355        lov = &obd->u.lov;
1356        rc = lov_prep_statfs_set(obd, oinfo, &set);
1357        if (rc)
1358                return rc;
1359
1360        list_for_each(pos, &set->set_list) {
1361                req = list_entry(pos, struct lov_request, rq_link);
1362                rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1363                                      &req->rq_oi, max_age, rqset);
1364                if (rc)
1365                        break;
1366        }
1367
1368        if (rc || list_empty(&rqset->set_requests)) {
1369                int err;
1370                if (rc)
1371                        atomic_set(&set->set_completes, 0);
1372                err = lov_fini_statfs_set(set);
1373                return rc ? rc : err;
1374        }
1375
1376        LASSERT(rqset->set_interpret == NULL);
1377        rqset->set_interpret = lov_statfs_interpret;
1378        rqset->set_arg = (void *)set;
1379        return 0;
1380}
1381
1382static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
1383                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1384{
1385        struct ptlrpc_request_set *set = NULL;
1386        struct obd_info oinfo = { { { 0 } } };
1387        int rc = 0;
1388
1389        /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
1390         * statfs requests */
1391        set = ptlrpc_prep_set();
1392        if (set == NULL)
1393                return -ENOMEM;
1394
1395        oinfo.oi_osfs = osfs;
1396        oinfo.oi_flags = flags;
1397        rc = lov_statfs_async(exp, &oinfo, max_age, set);
1398        if (rc == 0)
1399                rc = ptlrpc_set_wait(set);
1400        ptlrpc_set_destroy(set);
1401
1402        return rc;
1403}
1404
1405static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1406                         void *karg, void *uarg)
1407{
1408        struct obd_device *obddev = class_exp2obd(exp);
1409        struct lov_obd *lov = &obddev->u.lov;
1410        int i = 0, rc = 0, count = lov->desc.ld_tgt_count;
1411        struct obd_uuid *uuidp;
1412
1413        switch (cmd) {
1414        case IOC_OBD_STATFS: {
1415                struct obd_ioctl_data *data = karg;
1416                struct obd_device *osc_obd;
1417                struct obd_statfs stat_buf = {0};
1418                __u32 index;
1419                __u32 flags;
1420
1421                memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1422                if ((index >= count))
1423                        return -ENODEV;
1424
1425                if (!lov->lov_tgts[index])
1426                        /* Try again with the next index */
1427                        return -EAGAIN;
1428                if (!lov->lov_tgts[index]->ltd_active)
1429                        return -ENODATA;
1430
1431                osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
1432                if (!osc_obd)
1433                        return -EINVAL;
1434
1435                /* copy UUID */
1436                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
1437                                     min((int) data->ioc_plen2,
1438                                         (int) sizeof(struct obd_uuid))))
1439                        return -EFAULT;
1440
1441                flags = uarg ? *(__u32 *)uarg : 0;
1442                /* got statfs data */
1443                rc = obd_statfs(NULL, lov->lov_tgts[index]->ltd_exp, &stat_buf,
1444                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1445                                flags);
1446                if (rc)
1447                        return rc;
1448                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1449                                     min((int) data->ioc_plen1,
1450                                         (int) sizeof(stat_buf))))
1451                        return -EFAULT;
1452                break;
1453        }
1454        case OBD_IOC_LOV_GET_CONFIG: {
1455                struct obd_ioctl_data *data;
1456                struct lov_desc *desc;
1457                char *buf = NULL;
1458                __u32 *genp;
1459
1460                len = 0;
1461                if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1462                        return -EINVAL;
1463
1464                data = (struct obd_ioctl_data *)buf;
1465
1466                if (sizeof(*desc) > data->ioc_inllen1) {
1467                        obd_ioctl_freedata(buf, len);
1468                        return -EINVAL;
1469                }
1470
1471                if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1472                        obd_ioctl_freedata(buf, len);
1473                        return -EINVAL;
1474                }
1475
1476                if (sizeof(__u32) * count > data->ioc_inllen3) {
1477                        obd_ioctl_freedata(buf, len);
1478                        return -EINVAL;
1479                }
1480
1481                desc = (struct lov_desc *)data->ioc_inlbuf1;
1482                memcpy(desc, &(lov->desc), sizeof(*desc));
1483
1484                uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1485                genp = (__u32 *)data->ioc_inlbuf3;
1486                /* the uuid will be empty for deleted OSTs */
1487                for (i = 0; i < count; i++, uuidp++, genp++) {
1488                        if (!lov->lov_tgts[i])
1489                                continue;
1490                        *uuidp = lov->lov_tgts[i]->ltd_uuid;
1491                        *genp = lov->lov_tgts[i]->ltd_gen;
1492                }
1493
1494                if (copy_to_user((void *)uarg, buf, len))
1495                        rc = -EFAULT;
1496                obd_ioctl_freedata(buf, len);
1497                break;
1498        }
1499        case LL_IOC_LOV_GETSTRIPE:
1500                rc = lov_getstripe(exp, karg, uarg);
1501                break;
1502        case OBD_IOC_QUOTACTL: {
1503                struct if_quotactl *qctl = karg;
1504                struct lov_tgt_desc *tgt = NULL;
1505                struct obd_quotactl *oqctl;
1506
1507                if (qctl->qc_valid == QC_OSTIDX) {
1508                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
1509                                return -EINVAL;
1510
1511                        tgt = lov->lov_tgts[qctl->qc_idx];
1512                        if (!tgt || !tgt->ltd_exp)
1513                                return -EINVAL;
1514                } else if (qctl->qc_valid == QC_UUID) {
1515                        for (i = 0; i < count; i++) {
1516                                tgt = lov->lov_tgts[i];
1517                                if (!tgt ||
1518                                    !obd_uuid_equals(&tgt->ltd_uuid,
1519                                                     &qctl->obd_uuid))
1520                                        continue;
1521
1522                                if (tgt->ltd_exp == NULL)
1523                                        return -EINVAL;
1524
1525                                break;
1526                        }
1527                } else {
1528                        return -EINVAL;
1529                }
1530
1531                if (i >= count)
1532                        return -EAGAIN;
1533
1534                LASSERT(tgt && tgt->ltd_exp);
1535                OBD_ALLOC_PTR(oqctl);
1536                if (!oqctl)
1537                        return -ENOMEM;
1538
1539                QCTL_COPY(oqctl, qctl);
1540                rc = obd_quotactl(tgt->ltd_exp, oqctl);
1541                if (rc == 0) {
1542                        QCTL_COPY(qctl, oqctl);
1543                        qctl->qc_valid = QC_OSTIDX;
1544                        qctl->obd_uuid = tgt->ltd_uuid;
1545                }
1546                OBD_FREE_PTR(oqctl);
1547                break;
1548        }
1549        default: {
1550                int set = 0;
1551
1552                if (count == 0)
1553                        return -ENOTTY;
1554
1555                for (i = 0; i < count; i++) {
1556                        int err;
1557                        struct obd_device *osc_obd;
1558
1559                        /* OST was disconnected */
1560                        if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
1561                                continue;
1562
1563                        /* ll_umount_begin() sets force flag but for lov, not
1564                         * osc. Let's pass it through */
1565                        osc_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
1566                        osc_obd->obd_force = obddev->obd_force;
1567                        err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
1568                                            len, karg, uarg);
1569                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1570                                return err;
1571                        } else if (err) {
1572                                if (lov->lov_tgts[i]->ltd_active) {
1573                                        CDEBUG(err == -ENOTTY ?
1574                                               D_IOCTL : D_WARNING,
1575                                               "iocontrol OSC %s on OST idx %d cmd %x: err = %d\n",
1576                                               lov_uuid2str(lov, i),
1577                                               i, cmd, err);
1578                                        if (!rc)
1579                                                rc = err;
1580                                }
1581                        } else {
1582                                set = 1;
1583                        }
1584                }
1585                if (!set && !rc)
1586                        rc = -EIO;
1587        }
1588        }
1589
1590        return rc;
1591}
1592
1593#define FIEMAP_BUFFER_SIZE 4096
1594
1595/**
1596 * Non-zero fe_logical indicates that this is a continuation FIEMAP
1597 * call. The local end offset and the device are sent in the first
1598 * fm_extent. This function calculates the stripe number from the index.
1599 * This function returns a stripe_no on which mapping is to be restarted.
1600 *
1601 * This function returns fm_end_offset which is the in-OST offset at which
1602 * mapping should be restarted. If fm_end_offset=0 is returned then caller
1603 * will re-calculate proper offset in next stripe.
1604 * Note that the first extent is passed to lov_get_info via the value field.
1605 *
1606 * \param fiemap fiemap request header
1607 * \param lsm striping information for the file
1608 * \param fm_start logical start of mapping
1609 * \param fm_end logical end of mapping
1610 * \param start_stripe starting stripe will be returned in this
1611 */
1612static u64 fiemap_calc_fm_end_offset(struct ll_user_fiemap *fiemap,
1613                                     struct lov_stripe_md *lsm, u64 fm_start,
1614                                     u64 fm_end, int *start_stripe)
1615{
1616        u64 local_end = fiemap->fm_extents[0].fe_logical;
1617        u64 lun_start, lun_end;
1618        u64 fm_end_offset;
1619        int stripe_no = -1, i;
1620
1621        if (fiemap->fm_extent_count == 0 ||
1622            fiemap->fm_extents[0].fe_logical == 0)
1623                return 0;
1624
1625        /* Find out stripe_no from ost_index saved in the fe_device */
1626        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1627                struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1628
1629                if (lov_oinfo_is_dummy(oinfo))
1630                        continue;
1631
1632                if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1633                        stripe_no = i;
1634                        break;
1635                }
1636        }
1637        if (stripe_no == -1)
1638                return -EINVAL;
1639
1640        /* If we have finished mapping on previous device, shift logical
1641         * offset to start of next device */
1642        if ((lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1643                                   &lun_start, &lun_end)) != 0 &&
1644                                   local_end < lun_end) {
1645                fm_end_offset = local_end;
1646                *start_stripe = stripe_no;
1647        } else {
1648                /* This is a special value to indicate that caller should
1649                 * calculate offset in next stripe. */
1650                fm_end_offset = 0;
1651                *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1652        }
1653
1654        return fm_end_offset;
1655}
1656
1657/**
1658 * We calculate on which OST the mapping will end. If the length of mapping
1659 * is greater than (stripe_size * stripe_count) then the last_stripe will
1660 * will be one just before start_stripe. Else we check if the mapping
1661 * intersects each OST and find last_stripe.
1662 * This function returns the last_stripe and also sets the stripe_count
1663 * over which the mapping is spread
1664 *
1665 * \param lsm striping information for the file
1666 * \param fm_start logical start of mapping
1667 * \param fm_end logical end of mapping
1668 * \param start_stripe starting stripe of the mapping
1669 * \param stripe_count the number of stripes across which to map is returned
1670 *
1671 * \retval last_stripe return the last stripe of the mapping
1672 */
1673static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, u64 fm_start,
1674                                   u64 fm_end, int start_stripe,
1675                                   int *stripe_count)
1676{
1677        int last_stripe;
1678        u64 obd_start, obd_end;
1679        int i, j;
1680
1681        if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1682                last_stripe = start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1683                                                              start_stripe - 1;
1684                *stripe_count = lsm->lsm_stripe_count;
1685        } else {
1686                for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1687                     i = (i + 1) % lsm->lsm_stripe_count, j++) {
1688                        if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1689                                                   &obd_start, &obd_end)) == 0)
1690                                break;
1691                }
1692                *stripe_count = j;
1693                last_stripe = (start_stripe + j - 1) %lsm->lsm_stripe_count;
1694        }
1695
1696        return last_stripe;
1697}
1698
1699/**
1700 * Set fe_device and copy extents from local buffer into main return buffer.
1701 *
1702 * \param fiemap fiemap request header
1703 * \param lcl_fm_ext array of local fiemap extents to be copied
1704 * \param ost_index OST index to be written into the fm_device field for each
1705                    extent
1706 * \param ext_count number of extents to be copied
1707 * \param current_extent where to start copying in main extent array
1708 */
1709static void fiemap_prepare_and_copy_exts(struct ll_user_fiemap *fiemap,
1710                                         struct ll_fiemap_extent *lcl_fm_ext,
1711                                         int ost_index, unsigned int ext_count,
1712                                         int current_extent)
1713{
1714        char *to;
1715        int ext;
1716
1717        for (ext = 0; ext < ext_count; ext++) {
1718                lcl_fm_ext[ext].fe_device = ost_index;
1719                lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1720        }
1721
1722        /* Copy fm_extent's from fm_local to return buffer */
1723        to = (char *)fiemap + fiemap_count_to_size(current_extent);
1724        memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
1725}
1726
1727/**
1728 * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1729 * This also handles the restarting of FIEMAP calls in case mapping overflows
1730 * the available number of extents in single call.
1731 */
1732static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
1733                      __u32 *vallen, void *val, struct lov_stripe_md *lsm)
1734{
1735        struct ll_fiemap_info_key *fm_key = key;
1736        struct ll_user_fiemap *fiemap = val;
1737        struct ll_user_fiemap *fm_local = NULL;
1738        struct ll_fiemap_extent *lcl_fm_ext;
1739        int count_local;
1740        unsigned int get_num_extents = 0;
1741        int ost_index = 0, actual_start_stripe, start_stripe;
1742        u64 fm_start, fm_end, fm_length, fm_end_offset;
1743        u64 curr_loc;
1744        int current_extent = 0, rc = 0, i;
1745        int ost_eof = 0; /* EOF for object */
1746        int ost_done = 0; /* done with required mapping for this OST? */
1747        int last_stripe;
1748        int cur_stripe = 0, cur_stripe_wrap = 0, stripe_count;
1749        unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
1750
1751        if (!lsm_has_objects(lsm)) {
1752                rc = 0;
1753                goto out;
1754        }
1755
1756        if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
1757                buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
1758
1759        OBD_ALLOC_LARGE(fm_local, buffer_size);
1760        if (fm_local == NULL) {
1761                rc = -ENOMEM;
1762                goto out;
1763        }
1764        lcl_fm_ext = &fm_local->fm_extents[0];
1765
1766        count_local = fiemap_size_to_count(buffer_size);
1767
1768        memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap));
1769        fm_start = fiemap->fm_start;
1770        fm_length = fiemap->fm_length;
1771        /* Calculate start stripe, last stripe and length of mapping */
1772        actual_start_stripe = start_stripe = lov_stripe_number(lsm, fm_start);
1773        fm_end = (fm_length == ~0ULL ? fm_key->oa.o_size :
1774                                                fm_start + fm_length - 1);
1775        /* If fm_length != ~0ULL but fm_start+fm_length-1 exceeds file size */
1776        if (fm_end > fm_key->oa.o_size)
1777                fm_end = fm_key->oa.o_size;
1778
1779        last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1780                                            actual_start_stripe, &stripe_count);
1781
1782        fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start,
1783                                                  fm_end, &start_stripe);
1784        if (fm_end_offset == -EINVAL) {
1785                rc = -EINVAL;
1786                goto out;
1787        }
1788
1789        if (fiemap_count_to_size(fiemap->fm_extent_count) > *vallen)
1790                fiemap->fm_extent_count = fiemap_size_to_count(*vallen);
1791        if (fiemap->fm_extent_count == 0) {
1792                get_num_extents = 1;
1793                count_local = 0;
1794        }
1795        /* Check each stripe */
1796        for (cur_stripe = start_stripe, i = 0; i < stripe_count;
1797             i++, cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1798                u64 req_fm_len; /* Stores length of required mapping */
1799                u64 len_mapped_single_call;
1800                u64 lun_start, lun_end, obd_object_end;
1801                unsigned int ext_count;
1802
1803                cur_stripe_wrap = cur_stripe;
1804
1805                /* Find out range of mapping on this stripe */
1806                if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1807                                           &lun_start, &obd_object_end)) == 0)
1808                        continue;
1809
1810                if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe])) {
1811                        rc = -EIO;
1812                        goto out;
1813                }
1814
1815                /* If this is a continuation FIEMAP call and we are on
1816                 * starting stripe then lun_start needs to be set to
1817                 * fm_end_offset */
1818                if (fm_end_offset != 0 && cur_stripe == start_stripe)
1819                        lun_start = fm_end_offset;
1820
1821                if (fm_length != ~0ULL) {
1822                        /* Handle fm_start + fm_length overflow */
1823                        if (fm_start + fm_length < fm_start)
1824                                fm_length = ~0ULL - fm_start;
1825                        lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1826                                                     cur_stripe);
1827                } else {
1828                        lun_end = ~0ULL;
1829                }
1830
1831                if (lun_start == lun_end)
1832                        continue;
1833
1834                req_fm_len = obd_object_end - lun_start;
1835                fm_local->fm_length = 0;
1836                len_mapped_single_call = 0;
1837
1838                /* If the output buffer is very large and the objects have many
1839                 * extents we may need to loop on a single OST repeatedly */
1840                ost_eof = 0;
1841                ost_done = 0;
1842                do {
1843                        if (get_num_extents == 0) {
1844                                /* Don't get too many extents. */
1845                                if (current_extent + count_local >
1846                                    fiemap->fm_extent_count)
1847                                        count_local = fiemap->fm_extent_count -
1848                                                                 current_extent;
1849                        }
1850
1851                        lun_start += len_mapped_single_call;
1852                        fm_local->fm_length = req_fm_len - len_mapped_single_call;
1853                        req_fm_len = fm_local->fm_length;
1854                        fm_local->fm_extent_count = count_local;
1855                        fm_local->fm_mapped_extents = 0;
1856                        fm_local->fm_flags = fiemap->fm_flags;
1857
1858                        fm_key->oa.o_oi = lsm->lsm_oinfo[cur_stripe]->loi_oi;
1859                        ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1860
1861                        if (ost_index < 0 ||
1862                            ost_index >= lov->desc.ld_tgt_count) {
1863                                rc = -EINVAL;
1864                                goto out;
1865                        }
1866
1867                        /* If OST is inactive, return extent with UNKNOWN flag */
1868                        if (!lov->lov_tgts[ost_index]->ltd_active) {
1869                                fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1870                                fm_local->fm_mapped_extents = 1;
1871
1872                                lcl_fm_ext[0].fe_logical = lun_start;
1873                                lcl_fm_ext[0].fe_length = obd_object_end -
1874                                                                      lun_start;
1875                                lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1876
1877                                goto inactive_tgt;
1878                        }
1879
1880                        fm_local->fm_start = lun_start;
1881                        fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1882                        memcpy(&fm_key->fiemap, fm_local, sizeof(*fm_local));
1883                        *vallen=fiemap_count_to_size(fm_local->fm_extent_count);
1884                        rc = obd_get_info(NULL,
1885                                          lov->lov_tgts[ost_index]->ltd_exp,
1886                                          keylen, key, vallen, fm_local, lsm);
1887                        if (rc != 0)
1888                                goto out;
1889
1890inactive_tgt:
1891                        ext_count = fm_local->fm_mapped_extents;
1892                        if (ext_count == 0) {
1893                                ost_done = 1;
1894                                /* If last stripe has hole at the end,
1895                                 * then we need to return */
1896                                if (cur_stripe_wrap == last_stripe) {
1897                                        fiemap->fm_mapped_extents = 0;
1898                                        goto finish;
1899                                }
1900                                break;
1901                        }
1902
1903                        /* If we just need num of extents then go to next device */
1904                        if (get_num_extents) {
1905                                current_extent += ext_count;
1906                                break;
1907                        }
1908
1909                        len_mapped_single_call = lcl_fm_ext[ext_count-1].fe_logical -
1910                                  lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1911
1912                        /* Have we finished mapping on this device? */
1913                        if (req_fm_len <= len_mapped_single_call)
1914                                ost_done = 1;
1915
1916                        /* Clear the EXTENT_LAST flag which can be present on
1917                         * last extent */
1918                        if (lcl_fm_ext[ext_count-1].fe_flags & FIEMAP_EXTENT_LAST)
1919                                lcl_fm_ext[ext_count - 1].fe_flags &=
1920                                                            ~FIEMAP_EXTENT_LAST;
1921
1922                        curr_loc = lov_stripe_size(lsm,
1923                                           lcl_fm_ext[ext_count - 1].fe_logical+
1924                                           lcl_fm_ext[ext_count - 1].fe_length,
1925                                           cur_stripe);
1926                        if (curr_loc >= fm_key->oa.o_size)
1927                                ost_eof = 1;
1928
1929                        fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1930                                                     ost_index, ext_count,
1931                                                     current_extent);
1932
1933                        current_extent += ext_count;
1934
1935                        /* Ran out of available extents? */
1936                        if (current_extent >= fiemap->fm_extent_count)
1937                                goto finish;
1938                } while (ost_done == 0 && ost_eof == 0);
1939
1940                if (cur_stripe_wrap == last_stripe)
1941                        goto finish;
1942        }
1943
1944finish:
1945        /* Indicate that we are returning device offsets unless file just has
1946         * single stripe */
1947        if (lsm->lsm_stripe_count > 1)
1948                fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1949
1950        if (get_num_extents)
1951                goto skip_last_device_calc;
1952
1953        /* Check if we have reached the last stripe and whether mapping for that
1954         * stripe is done. */
1955        if (cur_stripe_wrap == last_stripe) {
1956                if (ost_done || ost_eof)
1957                        fiemap->fm_extents[current_extent - 1].fe_flags |=
1958                                                             FIEMAP_EXTENT_LAST;
1959        }
1960
1961skip_last_device_calc:
1962        fiemap->fm_mapped_extents = current_extent;
1963
1964out:
1965        OBD_FREE_LARGE(fm_local, buffer_size);
1966        return rc;
1967}
1968
1969static int lov_get_info(const struct lu_env *env, struct obd_export *exp,
1970                        __u32 keylen, void *key, __u32 *vallen, void *val,
1971                        struct lov_stripe_md *lsm)
1972{
1973        struct obd_device *obddev = class_exp2obd(exp);
1974        struct lov_obd *lov = &obddev->u.lov;
1975        int i, rc;
1976
1977        if (!vallen || !val)
1978                return -EFAULT;
1979
1980        obd_getref(obddev);
1981
1982        if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
1983                struct {
1984                        char name[16];
1985                        struct ldlm_lock *lock;
1986                } *data = key;
1987                struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
1988                struct lov_oinfo *loi;
1989                __u32 *stripe = val;
1990
1991                if (*vallen < sizeof(*stripe)) {
1992                        rc = -EFAULT;
1993                        goto out;
1994                }
1995                *vallen = sizeof(*stripe);
1996
1997                /* XXX This is another one of those bits that will need to
1998                 * change if we ever actually support nested LOVs.  It uses
1999                 * the lock's export to find out which stripe it is. */
2000                /* XXX - it's assumed all the locks for deleted OSTs have
2001                 * been cancelled. Also, the export for deleted OSTs will
2002                 * be NULL and won't match the lock's export. */
2003                for (i = 0; i < lsm->lsm_stripe_count; i++) {
2004                        loi = lsm->lsm_oinfo[i];
2005                        if (lov_oinfo_is_dummy(loi))
2006                                continue;
2007
2008                        if (!lov->lov_tgts[loi->loi_ost_idx])
2009                                continue;
2010                        if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2011                            data->lock->l_conn_export &&
2012                            ostid_res_name_eq(&loi->loi_oi, res_id)) {
2013                                *stripe = i;
2014                                rc = 0;
2015                                goto out;
2016                        }
2017                }
2018                LDLM_ERROR(data->lock, "lock on inode without such object");
2019                dump_lsm(D_ERROR, lsm);
2020                rc = -ENXIO;
2021                goto out;
2022        } else if (KEY_IS(KEY_LAST_ID)) {
2023                struct obd_id_info *info = val;
2024                __u32 size = sizeof(u64);
2025                struct lov_tgt_desc *tgt;
2026
2027                LASSERT(*vallen == sizeof(struct obd_id_info));
2028                tgt = lov->lov_tgts[info->idx];
2029
2030                if (!tgt || !tgt->ltd_active) {
2031                        rc = -ESRCH;
2032                        goto out;
2033                }
2034
2035                rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2036                                  &size, info->data, NULL);
2037                rc = 0;
2038                goto out;
2039        } else if (KEY_IS(KEY_LOVDESC)) {
2040                struct lov_desc *desc_ret = val;
2041                *desc_ret = lov->desc;
2042
2043                rc = 0;
2044                goto out;
2045        } else if (KEY_IS(KEY_FIEMAP)) {
2046                rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
2047                goto out;
2048        } else if (KEY_IS(KEY_CONNECT_FLAG)) {
2049                struct lov_tgt_desc *tgt;
2050                __u64 ost_idx = *((__u64 *)val);
2051
2052                LASSERT(*vallen == sizeof(__u64));
2053                LASSERT(ost_idx < lov->desc.ld_tgt_count);
2054                tgt = lov->lov_tgts[ost_idx];
2055
2056                if (!tgt || !tgt->ltd_exp) {
2057                        rc = -ESRCH;
2058                        goto out;
2059                }
2060
2061                *((__u64 *)val) = exp_connect_flags(tgt->ltd_exp);
2062                rc = 0;
2063                goto out;
2064        } else if (KEY_IS(KEY_TGT_COUNT)) {
2065                *((int *)val) = lov->desc.ld_tgt_count;
2066                rc = 0;
2067                goto out;
2068        }
2069
2070        rc = -EINVAL;
2071
2072out:
2073        obd_putref(obddev);
2074        return rc;
2075}
2076
2077static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
2078                              u32 keylen, void *key, u32 vallen,
2079                              void *val, struct ptlrpc_request_set *set)
2080{
2081        struct obd_device *obddev = class_exp2obd(exp);
2082        struct lov_obd *lov = &obddev->u.lov;
2083        u32 count;
2084        int i, rc = 0, err;
2085        struct lov_tgt_desc *tgt;
2086        unsigned incr, check_uuid,
2087                 do_inactive, no_set;
2088        unsigned next_id = 0,  mds_con = 0, capa = 0;
2089
2090        incr = check_uuid = do_inactive = no_set = 0;
2091        if (set == NULL) {
2092                no_set = 1;
2093                set = ptlrpc_prep_set();
2094                if (!set)
2095                        return -ENOMEM;
2096        }
2097
2098        obd_getref(obddev);
2099        count = lov->desc.ld_tgt_count;
2100
2101        if (KEY_IS(KEY_NEXT_ID)) {
2102                count = vallen / sizeof(struct obd_id_info);
2103                vallen = sizeof(u64);
2104                incr = sizeof(struct obd_id_info);
2105                do_inactive = 1;
2106                next_id = 1;
2107        } else if (KEY_IS(KEY_CHECKSUM)) {
2108                do_inactive = 1;
2109        } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2110                /* use defaults:  do_inactive = incr = 0; */
2111        } else if (KEY_IS(KEY_MDS_CONN)) {
2112                mds_con = 1;
2113        } else if (KEY_IS(KEY_CAPA_KEY)) {
2114                capa = 1;
2115        } else if (KEY_IS(KEY_CACHE_SET)) {
2116                LASSERT(lov->lov_cache == NULL);
2117                lov->lov_cache = val;
2118                do_inactive = 1;
2119        }
2120
2121        for (i = 0; i < count; i++, val = (char *)val + incr) {
2122                if (next_id) {
2123                        tgt = lov->lov_tgts[((struct obd_id_info *)val)->idx];
2124                } else {
2125                        tgt = lov->lov_tgts[i];
2126                }
2127                /* OST was disconnected */
2128                if (!tgt || !tgt->ltd_exp)
2129                        continue;
2130
2131                /* OST is inactive and we don't want inactive OSCs */
2132                if (!tgt->ltd_active && !do_inactive)
2133                        continue;
2134
2135                if (mds_con) {
2136                        struct mds_group_info *mgi;
2137
2138                        LASSERT(vallen == sizeof(*mgi));
2139                        mgi = (struct mds_group_info *)val;
2140
2141                        /* Only want a specific OSC */
2142                        if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
2143                                                &tgt->ltd_uuid))
2144                                continue;
2145
2146                        err = obd_set_info_async(env, tgt->ltd_exp,
2147                                         keylen, key, sizeof(int),
2148                                         &mgi->group, set);
2149                } else if (next_id) {
2150                        err = obd_set_info_async(env, tgt->ltd_exp,
2151                                         keylen, key, vallen,
2152                                         ((struct obd_id_info *)val)->data, set);
2153                } else if (capa) {
2154                        struct mds_capa_info *info = (struct mds_capa_info *)val;
2155
2156                        LASSERT(vallen == sizeof(*info));
2157
2158                         /* Only want a specific OSC */
2159                        if (info->uuid &&
2160                            !obd_uuid_equals(info->uuid, &tgt->ltd_uuid))
2161                                continue;
2162
2163                        err = obd_set_info_async(env, tgt->ltd_exp, keylen,
2164                                                 key, sizeof(*info->capa),
2165                                                 info->capa, set);
2166                } else {
2167                        /* Only want a specific OSC */
2168                        if (check_uuid &&
2169                            !obd_uuid_equals(val, &tgt->ltd_uuid))
2170                                continue;
2171
2172                        err = obd_set_info_async(env, tgt->ltd_exp,
2173                                         keylen, key, vallen, val, set);
2174                }
2175
2176                if (!rc)
2177                        rc = err;
2178        }
2179
2180        obd_putref(obddev);
2181        if (no_set) {
2182                err = ptlrpc_set_wait(set);
2183                if (!rc)
2184                        rc = err;
2185                ptlrpc_set_destroy(set);
2186        }
2187        return rc;
2188}
2189
2190void lov_stripe_lock(struct lov_stripe_md *md)
2191                __acquires(&md->lsm_lock)
2192{
2193        LASSERT(md->lsm_lock_owner != current_pid());
2194        spin_lock(&md->lsm_lock);
2195        LASSERT(md->lsm_lock_owner == 0);
2196        md->lsm_lock_owner = current_pid();
2197}
2198EXPORT_SYMBOL(lov_stripe_lock);
2199
2200void lov_stripe_unlock(struct lov_stripe_md *md)
2201                __releases(&md->lsm_lock)
2202{
2203        LASSERT(md->lsm_lock_owner == current_pid());
2204        md->lsm_lock_owner = 0;
2205        spin_unlock(&md->lsm_lock);
2206}
2207EXPORT_SYMBOL(lov_stripe_unlock);
2208
2209static int lov_quotactl(struct obd_device *obd, struct obd_export *exp,
2210                        struct obd_quotactl *oqctl)
2211{
2212        struct lov_obd      *lov = &obd->u.lov;
2213        struct lov_tgt_desc *tgt;
2214        __u64           curspace = 0;
2215        __u64           bhardlimit = 0;
2216        int               i, rc = 0;
2217
2218        if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON &&
2219            oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF &&
2220            oqctl->qc_cmd != Q_GETOQUOTA &&
2221            oqctl->qc_cmd != Q_INITQUOTA &&
2222            oqctl->qc_cmd != LUSTRE_Q_SETQUOTA &&
2223            oqctl->qc_cmd != Q_FINVALIDATE) {
2224                CERROR("bad quota opc %x for lov obd", oqctl->qc_cmd);
2225                return -EFAULT;
2226        }
2227
2228        /* for lov tgt */
2229        obd_getref(obd);
2230        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2231                int err;
2232
2233                tgt = lov->lov_tgts[i];
2234
2235                if (!tgt)
2236                        continue;
2237
2238                if (!tgt->ltd_active || tgt->ltd_reap) {
2239                        if (oqctl->qc_cmd == Q_GETOQUOTA &&
2240                            lov->lov_tgts[i]->ltd_activate) {
2241                                rc = -EREMOTEIO;
2242                                CERROR("ost %d is inactive\n", i);
2243                        } else {
2244                                CDEBUG(D_HA, "ost %d is inactive\n", i);
2245                        }
2246                        continue;
2247                }
2248
2249                err = obd_quotactl(tgt->ltd_exp, oqctl);
2250                if (err) {
2251                        if (tgt->ltd_active && !rc)
2252                                rc = err;
2253                        continue;
2254                }
2255
2256                if (oqctl->qc_cmd == Q_GETOQUOTA) {
2257                        curspace += oqctl->qc_dqblk.dqb_curspace;
2258                        bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
2259                }
2260        }
2261        obd_putref(obd);
2262
2263        if (oqctl->qc_cmd == Q_GETOQUOTA) {
2264                oqctl->qc_dqblk.dqb_curspace = curspace;
2265                oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
2266        }
2267        return rc;
2268}
2269
2270static int lov_quotacheck(struct obd_device *obd, struct obd_export *exp,
2271                          struct obd_quotactl *oqctl)
2272{
2273        struct lov_obd *lov = &obd->u.lov;
2274        int          i, rc = 0;
2275
2276        obd_getref(obd);
2277
2278        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2279                if (!lov->lov_tgts[i])
2280                        continue;
2281
2282                /* Skip quota check on the administratively disabled OSTs. */
2283                if (!lov->lov_tgts[i]->ltd_activate) {
2284                        CWARN("lov idx %d was administratively disabled, skip quotacheck on it.\n",
2285                              i);
2286                        continue;
2287                }
2288
2289                if (!lov->lov_tgts[i]->ltd_active) {
2290                        CERROR("lov idx %d inactive\n", i);
2291                        rc = -EIO;
2292                        goto out;
2293                }
2294        }
2295
2296        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2297                int err;
2298
2299                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_activate)
2300                        continue;
2301
2302                err = obd_quotacheck(lov->lov_tgts[i]->ltd_exp, oqctl);
2303                if (err && !rc)
2304                        rc = err;
2305        }
2306
2307out:
2308        obd_putref(obd);
2309
2310        return rc;
2311}
2312
2313static struct obd_ops lov_obd_ops = {
2314        .o_owner               = THIS_MODULE,
2315        .o_setup               = lov_setup,
2316        .o_precleanup     = lov_precleanup,
2317        .o_cleanup           = lov_cleanup,
2318        /*.o_process_config      = lov_process_config,*/
2319        .o_connect           = lov_connect,
2320        .o_disconnect     = lov_disconnect,
2321        .o_statfs             = lov_statfs,
2322        .o_statfs_async = lov_statfs_async,
2323        .o_packmd             = lov_packmd,
2324        .o_unpackmd         = lov_unpackmd,
2325        .o_create             = lov_create,
2326        .o_destroy           = lov_destroy,
2327        .o_getattr_async       = lov_getattr_async,
2328        .o_setattr_async       = lov_setattr_async,
2329        .o_adjust_kms     = lov_adjust_kms,
2330        .o_find_cbdata   = lov_find_cbdata,
2331        .o_iocontrol       = lov_iocontrol,
2332        .o_get_info         = lov_get_info,
2333        .o_set_info_async      = lov_set_info_async,
2334        .o_notify             = lov_notify,
2335        .o_pool_new         = lov_pool_new,
2336        .o_pool_rem         = lov_pool_remove,
2337        .o_pool_add         = lov_pool_add,
2338        .o_pool_del         = lov_pool_del,
2339        .o_getref             = lov_getref,
2340        .o_putref             = lov_putref,
2341        .o_quotactl         = lov_quotactl,
2342        .o_quotacheck     = lov_quotacheck,
2343};
2344
2345struct kmem_cache *lov_oinfo_slab;
2346
2347static int __init lov_init(void)
2348{
2349        struct lprocfs_static_vars lvars = { NULL };
2350        int rc;
2351
2352        /* print an address of _any_ initialized kernel symbol from this
2353         * module, to allow debugging with gdb that doesn't support data
2354         * symbols from modules.*/
2355        CDEBUG(D_INFO, "Lustre LOV module (%p).\n", &lov_caches);
2356
2357        rc = lu_kmem_init(lov_caches);
2358        if (rc)
2359                return rc;
2360
2361        lov_oinfo_slab = kmem_cache_create("lov_oinfo",
2362                                              sizeof(struct lov_oinfo),
2363                                              0, SLAB_HWCACHE_ALIGN, NULL);
2364        if (lov_oinfo_slab == NULL) {
2365                lu_kmem_fini(lov_caches);
2366                return -ENOMEM;
2367        }
2368        lprocfs_lov_init_vars(&lvars);
2369
2370        rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2371                                 LUSTRE_LOV_NAME, &lov_device_type);
2372
2373        if (rc) {
2374                kmem_cache_destroy(lov_oinfo_slab);
2375                lu_kmem_fini(lov_caches);
2376        }
2377
2378        return rc;
2379}
2380
2381static void /*__exit*/ lov_exit(void)
2382{
2383        class_unregister_type(LUSTRE_LOV_NAME);
2384        kmem_cache_destroy(lov_oinfo_slab);
2385
2386        lu_kmem_fini(lov_caches);
2387}
2388
2389MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2390MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2391MODULE_LICENSE("GPL");
2392MODULE_VERSION(LUSTRE_VERSION_STRING);
2393
2394module_init(lov_init);
2395module_exit(lov_exit);
2396