linux/drivers/staging/lustre/lustre/obdclass/genops.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/obdclass/genops.c
  37 *
  38 * These are the only exported functions, they provide some generic
  39 * infrastructure for managing object devices
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_CLASS
  43#include <obd_ost.h>
  44#include <obd_class.h>
  45#include <lprocfs_status.h>
  46
  47extern struct list_head obd_types;
  48spinlock_t obd_types_lock;
  49
  50struct kmem_cache *obd_device_cachep;
  51struct kmem_cache *obdo_cachep;
  52EXPORT_SYMBOL(obdo_cachep);
  53struct kmem_cache *import_cachep;
  54
  55struct list_head      obd_zombie_imports;
  56struct list_head      obd_zombie_exports;
  57spinlock_t  obd_zombie_impexp_lock;
  58static void obd_zombie_impexp_notify(void);
  59static void obd_zombie_export_add(struct obd_export *exp);
  60static void obd_zombie_import_add(struct obd_import *imp);
  61static void print_export_data(struct obd_export *exp,
  62                              const char *status, int locks);
  63
  64int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
  65EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
  66
  67/*
  68 * support functions: we could use inter-module communication, but this
  69 * is more portable to other OS's
  70 */
  71static struct obd_device *obd_device_alloc(void)
  72{
  73        struct obd_device *obd;
  74
  75        OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
  76        if (obd != NULL) {
  77                obd->obd_magic = OBD_DEVICE_MAGIC;
  78        }
  79        return obd;
  80}
  81
  82static void obd_device_free(struct obd_device *obd)
  83{
  84        LASSERT(obd != NULL);
  85        LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
  86                 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
  87        if (obd->obd_namespace != NULL) {
  88                CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
  89                       obd, obd->obd_namespace, obd->obd_force);
  90                LBUG();
  91        }
  92        lu_ref_fini(&obd->obd_reference);
  93        OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
  94}
  95
  96struct obd_type *class_search_type(const char *name)
  97{
  98        struct list_head *tmp;
  99        struct obd_type *type;
 100
 101        spin_lock(&obd_types_lock);
 102        list_for_each(tmp, &obd_types) {
 103                type = list_entry(tmp, struct obd_type, typ_chain);
 104                if (strcmp(type->typ_name, name) == 0) {
 105                        spin_unlock(&obd_types_lock);
 106                        return type;
 107                }
 108        }
 109        spin_unlock(&obd_types_lock);
 110        return NULL;
 111}
 112EXPORT_SYMBOL(class_search_type);
 113
 114struct obd_type *class_get_type(const char *name)
 115{
 116        struct obd_type *type = class_search_type(name);
 117
 118        if (!type) {
 119                const char *modname = name;
 120
 121                if (strcmp(modname, "obdfilter") == 0)
 122                        modname = "ofd";
 123
 124                if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
 125                        modname = LUSTRE_OSP_NAME;
 126
 127                if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
 128                        modname = LUSTRE_MDT_NAME;
 129
 130                if (!request_module("%s", modname)) {
 131                        CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
 132                        type = class_search_type(name);
 133                } else {
 134                        LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
 135                                           modname);
 136                }
 137        }
 138        if (type) {
 139                spin_lock(&type->obd_type_lock);
 140                type->typ_refcnt++;
 141                try_module_get(type->typ_dt_ops->o_owner);
 142                spin_unlock(&type->obd_type_lock);
 143        }
 144        return type;
 145}
 146EXPORT_SYMBOL(class_get_type);
 147
 148void class_put_type(struct obd_type *type)
 149{
 150        LASSERT(type);
 151        spin_lock(&type->obd_type_lock);
 152        type->typ_refcnt--;
 153        module_put(type->typ_dt_ops->o_owner);
 154        spin_unlock(&type->obd_type_lock);
 155}
 156EXPORT_SYMBOL(class_put_type);
 157
 158#define CLASS_MAX_NAME 1024
 159
 160int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
 161                        struct lprocfs_vars *vars, const char *name,
 162                        struct lu_device_type *ldt)
 163{
 164        struct obd_type *type;
 165        int rc = 0;
 166
 167        /* sanity check */
 168        LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
 169
 170        if (class_search_type(name)) {
 171                CDEBUG(D_IOCTL, "Type %s already registered\n", name);
 172                return -EEXIST;
 173        }
 174
 175        rc = -ENOMEM;
 176        OBD_ALLOC(type, sizeof(*type));
 177        if (type == NULL)
 178                return rc;
 179
 180        OBD_ALLOC_PTR(type->typ_dt_ops);
 181        OBD_ALLOC_PTR(type->typ_md_ops);
 182        OBD_ALLOC(type->typ_name, strlen(name) + 1);
 183
 184        if (type->typ_dt_ops == NULL ||
 185            type->typ_md_ops == NULL ||
 186            type->typ_name == NULL)
 187                GOTO (failed, rc);
 188
 189        *(type->typ_dt_ops) = *dt_ops;
 190        /* md_ops is optional */
 191        if (md_ops)
 192                *(type->typ_md_ops) = *md_ops;
 193        strcpy(type->typ_name, name);
 194        spin_lock_init(&type->obd_type_lock);
 195
 196#ifdef LPROCFS
 197        type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
 198                                              vars, type);
 199        if (IS_ERR(type->typ_procroot)) {
 200                rc = PTR_ERR(type->typ_procroot);
 201                type->typ_procroot = NULL;
 202                GOTO (failed, rc);
 203        }
 204#endif
 205        if (ldt != NULL) {
 206                type->typ_lu = ldt;
 207                rc = lu_device_type_init(ldt);
 208                if (rc != 0)
 209                        GOTO (failed, rc);
 210        }
 211
 212        spin_lock(&obd_types_lock);
 213        list_add(&type->typ_chain, &obd_types);
 214        spin_unlock(&obd_types_lock);
 215
 216        return 0;
 217
 218 failed:
 219        if (type->typ_name != NULL)
 220                OBD_FREE(type->typ_name, strlen(name) + 1);
 221        if (type->typ_md_ops != NULL)
 222                OBD_FREE_PTR(type->typ_md_ops);
 223        if (type->typ_dt_ops != NULL)
 224                OBD_FREE_PTR(type->typ_dt_ops);
 225        OBD_FREE(type, sizeof(*type));
 226        return rc;
 227}
 228EXPORT_SYMBOL(class_register_type);
 229
 230int class_unregister_type(const char *name)
 231{
 232        struct obd_type *type = class_search_type(name);
 233
 234        if (!type) {
 235                CERROR("unknown obd type\n");
 236                return -EINVAL;
 237        }
 238
 239        if (type->typ_refcnt) {
 240                CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
 241                /* This is a bad situation, let's make the best of it */
 242                /* Remove ops, but leave the name for debugging */
 243                OBD_FREE_PTR(type->typ_dt_ops);
 244                OBD_FREE_PTR(type->typ_md_ops);
 245                return -EBUSY;
 246        }
 247
 248        if (type->typ_procroot) {
 249                lprocfs_remove(&type->typ_procroot);
 250        }
 251
 252        if (type->typ_lu)
 253                lu_device_type_fini(type->typ_lu);
 254
 255        spin_lock(&obd_types_lock);
 256        list_del(&type->typ_chain);
 257        spin_unlock(&obd_types_lock);
 258        OBD_FREE(type->typ_name, strlen(name) + 1);
 259        if (type->typ_dt_ops != NULL)
 260                OBD_FREE_PTR(type->typ_dt_ops);
 261        if (type->typ_md_ops != NULL)
 262                OBD_FREE_PTR(type->typ_md_ops);
 263        OBD_FREE(type, sizeof(*type));
 264        return 0;
 265} /* class_unregister_type */
 266EXPORT_SYMBOL(class_unregister_type);
 267
 268/**
 269 * Create a new obd device.
 270 *
 271 * Find an empty slot in ::obd_devs[], create a new obd device in it.
 272 *
 273 * \param[in] type_name obd device type string.
 274 * \param[in] name      obd device name.
 275 *
 276 * \retval NULL if create fails, otherwise return the obd device
 277 *       pointer created.
 278 */
 279struct obd_device *class_newdev(const char *type_name, const char *name)
 280{
 281        struct obd_device *result = NULL;
 282        struct obd_device *newdev;
 283        struct obd_type *type = NULL;
 284        int i;
 285        int new_obd_minor = 0;
 286
 287        if (strlen(name) >= MAX_OBD_NAME) {
 288                CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
 289                return ERR_PTR(-EINVAL);
 290        }
 291
 292        type = class_get_type(type_name);
 293        if (type == NULL){
 294                CERROR("OBD: unknown type: %s\n", type_name);
 295                return ERR_PTR(-ENODEV);
 296        }
 297
 298        newdev = obd_device_alloc();
 299        if (newdev == NULL)
 300                GOTO(out_type, result = ERR_PTR(-ENOMEM));
 301
 302        LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
 303
 304        write_lock(&obd_dev_lock);
 305        for (i = 0; i < class_devno_max(); i++) {
 306                struct obd_device *obd = class_num2obd(i);
 307
 308                if (obd && (strcmp(name, obd->obd_name) == 0)) {
 309                        CERROR("Device %s already exists at %d, won't add\n",
 310                               name, i);
 311                        if (result) {
 312                                LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
 313                                         "%p obd_magic %08x != %08x\n", result,
 314                                         result->obd_magic, OBD_DEVICE_MAGIC);
 315                                LASSERTF(result->obd_minor == new_obd_minor,
 316                                         "%p obd_minor %d != %d\n", result,
 317                                         result->obd_minor, new_obd_minor);
 318
 319                                obd_devs[result->obd_minor] = NULL;
 320                                result->obd_name[0]='\0';
 321                         }
 322                        result = ERR_PTR(-EEXIST);
 323                        break;
 324                }
 325                if (!result && !obd) {
 326                        result = newdev;
 327                        result->obd_minor = i;
 328                        new_obd_minor = i;
 329                        result->obd_type = type;
 330                        strncpy(result->obd_name, name,
 331                                sizeof(result->obd_name) - 1);
 332                        obd_devs[i] = result;
 333                }
 334        }
 335        write_unlock(&obd_dev_lock);
 336
 337        if (result == NULL && i >= class_devno_max()) {
 338                CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
 339                       class_devno_max());
 340                GOTO(out, result = ERR_PTR(-EOVERFLOW));
 341        }
 342
 343        if (IS_ERR(result))
 344                GOTO(out, result);
 345
 346        CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
 347               result->obd_name, result);
 348
 349        return result;
 350out:
 351        obd_device_free(newdev);
 352out_type:
 353        class_put_type(type);
 354        return result;
 355}
 356
 357void class_release_dev(struct obd_device *obd)
 358{
 359        struct obd_type *obd_type = obd->obd_type;
 360
 361        LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
 362                 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
 363        LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
 364                 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
 365        LASSERT(obd_type != NULL);
 366
 367        CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
 368               obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
 369
 370        write_lock(&obd_dev_lock);
 371        obd_devs[obd->obd_minor] = NULL;
 372        write_unlock(&obd_dev_lock);
 373        obd_device_free(obd);
 374
 375        class_put_type(obd_type);
 376}
 377
 378int class_name2dev(const char *name)
 379{
 380        int i;
 381
 382        if (!name)
 383                return -1;
 384
 385        read_lock(&obd_dev_lock);
 386        for (i = 0; i < class_devno_max(); i++) {
 387                struct obd_device *obd = class_num2obd(i);
 388
 389                if (obd && strcmp(name, obd->obd_name) == 0) {
 390                        /* Make sure we finished attaching before we give
 391                           out any references */
 392                        LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
 393                        if (obd->obd_attached) {
 394                                read_unlock(&obd_dev_lock);
 395                                return i;
 396                        }
 397                        break;
 398                }
 399        }
 400        read_unlock(&obd_dev_lock);
 401
 402        return -1;
 403}
 404EXPORT_SYMBOL(class_name2dev);
 405
 406struct obd_device *class_name2obd(const char *name)
 407{
 408        int dev = class_name2dev(name);
 409
 410        if (dev < 0 || dev > class_devno_max())
 411                return NULL;
 412        return class_num2obd(dev);
 413}
 414EXPORT_SYMBOL(class_name2obd);
 415
 416int class_uuid2dev(struct obd_uuid *uuid)
 417{
 418        int i;
 419
 420        read_lock(&obd_dev_lock);
 421        for (i = 0; i < class_devno_max(); i++) {
 422                struct obd_device *obd = class_num2obd(i);
 423
 424                if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
 425                        LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
 426                        read_unlock(&obd_dev_lock);
 427                        return i;
 428                }
 429        }
 430        read_unlock(&obd_dev_lock);
 431
 432        return -1;
 433}
 434EXPORT_SYMBOL(class_uuid2dev);
 435
 436struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
 437{
 438        int dev = class_uuid2dev(uuid);
 439        if (dev < 0)
 440                return NULL;
 441        return class_num2obd(dev);
 442}
 443EXPORT_SYMBOL(class_uuid2obd);
 444
 445/**
 446 * Get obd device from ::obd_devs[]
 447 *
 448 * \param num [in] array index
 449 *
 450 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
 451 *       otherwise return the obd device there.
 452 */
 453struct obd_device *class_num2obd(int num)
 454{
 455        struct obd_device *obd = NULL;
 456
 457        if (num < class_devno_max()) {
 458                obd = obd_devs[num];
 459                if (obd == NULL)
 460                        return NULL;
 461
 462                LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
 463                         "%p obd_magic %08x != %08x\n",
 464                         obd, obd->obd_magic, OBD_DEVICE_MAGIC);
 465                LASSERTF(obd->obd_minor == num,
 466                         "%p obd_minor %0d != %0d\n",
 467                         obd, obd->obd_minor, num);
 468        }
 469
 470        return obd;
 471}
 472EXPORT_SYMBOL(class_num2obd);
 473
 474/**
 475 * Get obd devices count. Device in any
 476 *    state are counted
 477 * \retval obd device count
 478 */
 479int get_devices_count(void)
 480{
 481        int index, max_index = class_devno_max(), dev_count = 0;
 482
 483        read_lock(&obd_dev_lock);
 484        for (index = 0; index <= max_index; index++) {
 485                struct obd_device *obd = class_num2obd(index);
 486                if (obd != NULL)
 487                        dev_count++;
 488        }
 489        read_unlock(&obd_dev_lock);
 490
 491        return dev_count;
 492}
 493EXPORT_SYMBOL(get_devices_count);
 494
 495void class_obd_list(void)
 496{
 497        char *status;
 498        int i;
 499
 500        read_lock(&obd_dev_lock);
 501        for (i = 0; i < class_devno_max(); i++) {
 502                struct obd_device *obd = class_num2obd(i);
 503
 504                if (obd == NULL)
 505                        continue;
 506                if (obd->obd_stopping)
 507                        status = "ST";
 508                else if (obd->obd_set_up)
 509                        status = "UP";
 510                else if (obd->obd_attached)
 511                        status = "AT";
 512                else
 513                        status = "--";
 514                LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
 515                         i, status, obd->obd_type->typ_name,
 516                         obd->obd_name, obd->obd_uuid.uuid,
 517                         atomic_read(&obd->obd_refcount));
 518        }
 519        read_unlock(&obd_dev_lock);
 520        return;
 521}
 522
 523/* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
 524   specified, then only the client with that uuid is returned,
 525   otherwise any client connected to the tgt is returned. */
 526struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
 527                                          const char * typ_name,
 528                                          struct obd_uuid *grp_uuid)
 529{
 530        int i;
 531
 532        read_lock(&obd_dev_lock);
 533        for (i = 0; i < class_devno_max(); i++) {
 534                struct obd_device *obd = class_num2obd(i);
 535
 536                if (obd == NULL)
 537                        continue;
 538                if ((strncmp(obd->obd_type->typ_name, typ_name,
 539                             strlen(typ_name)) == 0)) {
 540                        if (obd_uuid_equals(tgt_uuid,
 541                                            &obd->u.cli.cl_target_uuid) &&
 542                            ((grp_uuid)? obd_uuid_equals(grp_uuid,
 543                                                         &obd->obd_uuid) : 1)) {
 544                                read_unlock(&obd_dev_lock);
 545                                return obd;
 546                        }
 547                }
 548        }
 549        read_unlock(&obd_dev_lock);
 550
 551        return NULL;
 552}
 553EXPORT_SYMBOL(class_find_client_obd);
 554
 555/* Iterate the obd_device list looking devices have grp_uuid. Start
 556   searching at *next, and if a device is found, the next index to look
 557   at is saved in *next. If next is NULL, then the first matching device
 558   will always be returned. */
 559struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
 560{
 561        int i;
 562
 563        if (next == NULL)
 564                i = 0;
 565        else if (*next >= 0 && *next < class_devno_max())
 566                i = *next;
 567        else
 568                return NULL;
 569
 570        read_lock(&obd_dev_lock);
 571        for (; i < class_devno_max(); i++) {
 572                struct obd_device *obd = class_num2obd(i);
 573
 574                if (obd == NULL)
 575                        continue;
 576                if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
 577                        if (next != NULL)
 578                                *next = i+1;
 579                        read_unlock(&obd_dev_lock);
 580                        return obd;
 581                }
 582        }
 583        read_unlock(&obd_dev_lock);
 584
 585        return NULL;
 586}
 587EXPORT_SYMBOL(class_devices_in_group);
 588
 589/**
 590 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
 591 * adjust sptlrpc settings accordingly.
 592 */
 593int class_notify_sptlrpc_conf(const char *fsname, int namelen)
 594{
 595        struct obd_device  *obd;
 596        const char       *type;
 597        int              i, rc = 0, rc2;
 598
 599        LASSERT(namelen > 0);
 600
 601        read_lock(&obd_dev_lock);
 602        for (i = 0; i < class_devno_max(); i++) {
 603                obd = class_num2obd(i);
 604
 605                if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
 606                        continue;
 607
 608                /* only notify mdc, osc, mdt, ost */
 609                type = obd->obd_type->typ_name;
 610                if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
 611                    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
 612                    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
 613                    strcmp(type, LUSTRE_OST_NAME) != 0)
 614                        continue;
 615
 616                if (strncmp(obd->obd_name, fsname, namelen))
 617                        continue;
 618
 619                class_incref(obd, __FUNCTION__, obd);
 620                read_unlock(&obd_dev_lock);
 621                rc2 = obd_set_info_async(NULL, obd->obd_self_export,
 622                                         sizeof(KEY_SPTLRPC_CONF),
 623                                         KEY_SPTLRPC_CONF, 0, NULL, NULL);
 624                rc = rc ? rc : rc2;
 625                class_decref(obd, __FUNCTION__, obd);
 626                read_lock(&obd_dev_lock);
 627        }
 628        read_unlock(&obd_dev_lock);
 629        return rc;
 630}
 631EXPORT_SYMBOL(class_notify_sptlrpc_conf);
 632
 633void obd_cleanup_caches(void)
 634{
 635        if (obd_device_cachep) {
 636                kmem_cache_destroy(obd_device_cachep);
 637                obd_device_cachep = NULL;
 638        }
 639        if (obdo_cachep) {
 640                kmem_cache_destroy(obdo_cachep);
 641                obdo_cachep = NULL;
 642        }
 643        if (import_cachep) {
 644                kmem_cache_destroy(import_cachep);
 645                import_cachep = NULL;
 646        }
 647        if (capa_cachep) {
 648                kmem_cache_destroy(capa_cachep);
 649                capa_cachep = NULL;
 650        }
 651}
 652
 653int obd_init_caches(void)
 654{
 655        LASSERT(obd_device_cachep == NULL);
 656        obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
 657                                                 sizeof(struct obd_device),
 658                                                 0, 0, NULL);
 659        if (!obd_device_cachep)
 660                GOTO(out, -ENOMEM);
 661
 662        LASSERT(obdo_cachep == NULL);
 663        obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
 664                                           0, 0, NULL);
 665        if (!obdo_cachep)
 666                GOTO(out, -ENOMEM);
 667
 668        LASSERT(import_cachep == NULL);
 669        import_cachep = kmem_cache_create("ll_import_cache",
 670                                             sizeof(struct obd_import),
 671                                             0, 0, NULL);
 672        if (!import_cachep)
 673                GOTO(out, -ENOMEM);
 674
 675        LASSERT(capa_cachep == NULL);
 676        capa_cachep = kmem_cache_create("capa_cache",
 677                                           sizeof(struct obd_capa), 0, 0, NULL);
 678        if (!capa_cachep)
 679                GOTO(out, -ENOMEM);
 680
 681        return 0;
 682 out:
 683        obd_cleanup_caches();
 684        return -ENOMEM;
 685
 686}
 687
 688/* map connection to client */
 689struct obd_export *class_conn2export(struct lustre_handle *conn)
 690{
 691        struct obd_export *export;
 692
 693        if (!conn) {
 694                CDEBUG(D_CACHE, "looking for null handle\n");
 695                return NULL;
 696        }
 697
 698        if (conn->cookie == -1) {  /* this means assign a new connection */
 699                CDEBUG(D_CACHE, "want a new connection\n");
 700                return NULL;
 701        }
 702
 703        CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
 704        export = class_handle2object(conn->cookie);
 705        return export;
 706}
 707EXPORT_SYMBOL(class_conn2export);
 708
 709struct obd_device *class_exp2obd(struct obd_export *exp)
 710{
 711        if (exp)
 712                return exp->exp_obd;
 713        return NULL;
 714}
 715EXPORT_SYMBOL(class_exp2obd);
 716
 717struct obd_device *class_conn2obd(struct lustre_handle *conn)
 718{
 719        struct obd_export *export;
 720        export = class_conn2export(conn);
 721        if (export) {
 722                struct obd_device *obd = export->exp_obd;
 723                class_export_put(export);
 724                return obd;
 725        }
 726        return NULL;
 727}
 728EXPORT_SYMBOL(class_conn2obd);
 729
 730struct obd_import *class_exp2cliimp(struct obd_export *exp)
 731{
 732        struct obd_device *obd = exp->exp_obd;
 733        if (obd == NULL)
 734                return NULL;
 735        return obd->u.cli.cl_import;
 736}
 737EXPORT_SYMBOL(class_exp2cliimp);
 738
 739struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
 740{
 741        struct obd_device *obd = class_conn2obd(conn);
 742        if (obd == NULL)
 743                return NULL;
 744        return obd->u.cli.cl_import;
 745}
 746EXPORT_SYMBOL(class_conn2cliimp);
 747
 748/* Export management functions */
 749static void class_export_destroy(struct obd_export *exp)
 750{
 751        struct obd_device *obd = exp->exp_obd;
 752
 753        LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
 754        LASSERT(obd != NULL);
 755
 756        CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
 757               exp->exp_client_uuid.uuid, obd->obd_name);
 758
 759        /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
 760        if (exp->exp_connection)
 761                ptlrpc_put_connection_superhack(exp->exp_connection);
 762
 763        LASSERT(list_empty(&exp->exp_outstanding_replies));
 764        LASSERT(list_empty(&exp->exp_uncommitted_replies));
 765        LASSERT(list_empty(&exp->exp_req_replay_queue));
 766        LASSERT(list_empty(&exp->exp_hp_rpcs));
 767        obd_destroy_export(exp);
 768        class_decref(obd, "export", exp);
 769
 770        OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
 771}
 772
 773static void export_handle_addref(void *export)
 774{
 775        class_export_get(export);
 776}
 777
 778static struct portals_handle_ops export_handle_ops = {
 779        .hop_addref = export_handle_addref,
 780        .hop_free   = NULL,
 781};
 782
 783struct obd_export *class_export_get(struct obd_export *exp)
 784{
 785        atomic_inc(&exp->exp_refcount);
 786        CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
 787               atomic_read(&exp->exp_refcount));
 788        return exp;
 789}
 790EXPORT_SYMBOL(class_export_get);
 791
 792void class_export_put(struct obd_export *exp)
 793{
 794        LASSERT(exp != NULL);
 795        LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
 796        CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
 797               atomic_read(&exp->exp_refcount) - 1);
 798
 799        if (atomic_dec_and_test(&exp->exp_refcount)) {
 800                LASSERT(!list_empty(&exp->exp_obd_chain));
 801                CDEBUG(D_IOCTL, "final put %p/%s\n",
 802                       exp, exp->exp_client_uuid.uuid);
 803
 804                /* release nid stat refererence */
 805                lprocfs_exp_cleanup(exp);
 806
 807                obd_zombie_export_add(exp);
 808        }
 809}
 810EXPORT_SYMBOL(class_export_put);
 811
 812/* Creates a new export, adds it to the hash table, and returns a
 813 * pointer to it. The refcount is 2: one for the hash reference, and
 814 * one for the pointer returned by this function. */
 815struct obd_export *class_new_export(struct obd_device *obd,
 816                                    struct obd_uuid *cluuid)
 817{
 818        struct obd_export *export;
 819        struct cfs_hash *hash = NULL;
 820        int rc = 0;
 821
 822        OBD_ALLOC_PTR(export);
 823        if (!export)
 824                return ERR_PTR(-ENOMEM);
 825
 826        export->exp_conn_cnt = 0;
 827        export->exp_lock_hash = NULL;
 828        export->exp_flock_hash = NULL;
 829        atomic_set(&export->exp_refcount, 2);
 830        atomic_set(&export->exp_rpc_count, 0);
 831        atomic_set(&export->exp_cb_count, 0);
 832        atomic_set(&export->exp_locks_count, 0);
 833#if LUSTRE_TRACKS_LOCK_EXP_REFS
 834        INIT_LIST_HEAD(&export->exp_locks_list);
 835        spin_lock_init(&export->exp_locks_list_guard);
 836#endif
 837        atomic_set(&export->exp_replay_count, 0);
 838        export->exp_obd = obd;
 839        INIT_LIST_HEAD(&export->exp_outstanding_replies);
 840        spin_lock_init(&export->exp_uncommitted_replies_lock);
 841        INIT_LIST_HEAD(&export->exp_uncommitted_replies);
 842        INIT_LIST_HEAD(&export->exp_req_replay_queue);
 843        INIT_LIST_HEAD(&export->exp_handle.h_link);
 844        INIT_LIST_HEAD(&export->exp_hp_rpcs);
 845        class_handle_hash(&export->exp_handle, &export_handle_ops);
 846        export->exp_last_request_time = cfs_time_current_sec();
 847        spin_lock_init(&export->exp_lock);
 848        spin_lock_init(&export->exp_rpc_lock);
 849        INIT_HLIST_NODE(&export->exp_uuid_hash);
 850        INIT_HLIST_NODE(&export->exp_nid_hash);
 851        spin_lock_init(&export->exp_bl_list_lock);
 852        INIT_LIST_HEAD(&export->exp_bl_list);
 853
 854        export->exp_sp_peer = LUSTRE_SP_ANY;
 855        export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
 856        export->exp_client_uuid = *cluuid;
 857        obd_init_export(export);
 858
 859        spin_lock(&obd->obd_dev_lock);
 860        /* shouldn't happen, but might race */
 861        if (obd->obd_stopping)
 862                GOTO(exit_unlock, rc = -ENODEV);
 863
 864        hash = cfs_hash_getref(obd->obd_uuid_hash);
 865        if (hash == NULL)
 866                GOTO(exit_unlock, rc = -ENODEV);
 867        spin_unlock(&obd->obd_dev_lock);
 868
 869        if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
 870                rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
 871                if (rc != 0) {
 872                        LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
 873                                      obd->obd_name, cluuid->uuid, rc);
 874                        GOTO(exit_err, rc = -EALREADY);
 875                }
 876        }
 877
 878        spin_lock(&obd->obd_dev_lock);
 879        if (obd->obd_stopping) {
 880                cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
 881                GOTO(exit_unlock, rc = -ENODEV);
 882        }
 883
 884        class_incref(obd, "export", export);
 885        list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
 886        list_add_tail(&export->exp_obd_chain_timed,
 887                          &export->exp_obd->obd_exports_timed);
 888        export->exp_obd->obd_num_exports++;
 889        spin_unlock(&obd->obd_dev_lock);
 890        cfs_hash_putref(hash);
 891        return export;
 892
 893exit_unlock:
 894        spin_unlock(&obd->obd_dev_lock);
 895exit_err:
 896        if (hash)
 897                cfs_hash_putref(hash);
 898        class_handle_unhash(&export->exp_handle);
 899        LASSERT(hlist_unhashed(&export->exp_uuid_hash));
 900        obd_destroy_export(export);
 901        OBD_FREE_PTR(export);
 902        return ERR_PTR(rc);
 903}
 904EXPORT_SYMBOL(class_new_export);
 905
 906void class_unlink_export(struct obd_export *exp)
 907{
 908        class_handle_unhash(&exp->exp_handle);
 909
 910        spin_lock(&exp->exp_obd->obd_dev_lock);
 911        /* delete an uuid-export hashitem from hashtables */
 912        if (!hlist_unhashed(&exp->exp_uuid_hash))
 913                cfs_hash_del(exp->exp_obd->obd_uuid_hash,
 914                             &exp->exp_client_uuid,
 915                             &exp->exp_uuid_hash);
 916
 917        list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
 918        list_del_init(&exp->exp_obd_chain_timed);
 919        exp->exp_obd->obd_num_exports--;
 920        spin_unlock(&exp->exp_obd->obd_dev_lock);
 921        class_export_put(exp);
 922}
 923EXPORT_SYMBOL(class_unlink_export);
 924
 925/* Import management functions */
 926void class_import_destroy(struct obd_import *imp)
 927{
 928        CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
 929                imp->imp_obd->obd_name);
 930
 931        LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
 932
 933        ptlrpc_put_connection_superhack(imp->imp_connection);
 934
 935        while (!list_empty(&imp->imp_conn_list)) {
 936                struct obd_import_conn *imp_conn;
 937
 938                imp_conn = list_entry(imp->imp_conn_list.next,
 939                                          struct obd_import_conn, oic_item);
 940                list_del_init(&imp_conn->oic_item);
 941                ptlrpc_put_connection_superhack(imp_conn->oic_conn);
 942                OBD_FREE(imp_conn, sizeof(*imp_conn));
 943        }
 944
 945        LASSERT(imp->imp_sec == NULL);
 946        class_decref(imp->imp_obd, "import", imp);
 947        OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
 948}
 949
 950static void import_handle_addref(void *import)
 951{
 952        class_import_get(import);
 953}
 954
 955static struct portals_handle_ops import_handle_ops = {
 956        .hop_addref = import_handle_addref,
 957        .hop_free   = NULL,
 958};
 959
 960struct obd_import *class_import_get(struct obd_import *import)
 961{
 962        atomic_inc(&import->imp_refcount);
 963        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
 964               atomic_read(&import->imp_refcount),
 965               import->imp_obd->obd_name);
 966        return import;
 967}
 968EXPORT_SYMBOL(class_import_get);
 969
 970void class_import_put(struct obd_import *imp)
 971{
 972        LASSERT(list_empty(&imp->imp_zombie_chain));
 973        LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
 974
 975        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
 976               atomic_read(&imp->imp_refcount) - 1,
 977               imp->imp_obd->obd_name);
 978
 979        if (atomic_dec_and_test(&imp->imp_refcount)) {
 980                CDEBUG(D_INFO, "final put import %p\n", imp);
 981                obd_zombie_import_add(imp);
 982        }
 983
 984        /* catch possible import put race */
 985        LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
 986}
 987EXPORT_SYMBOL(class_import_put);
 988
 989static void init_imp_at(struct imp_at *at) {
 990        int i;
 991        at_init(&at->iat_net_latency, 0, 0);
 992        for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
 993                /* max service estimates are tracked on the server side, so
 994                   don't use the AT history here, just use the last reported
 995                   val. (But keep hist for proc histogram, worst_ever) */
 996                at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
 997                        AT_FLG_NOHIST);
 998        }
 999}
1000
1001struct obd_import *class_new_import(struct obd_device *obd)
1002{
1003        struct obd_import *imp;
1004
1005        OBD_ALLOC(imp, sizeof(*imp));
1006        if (imp == NULL)
1007                return NULL;
1008
1009        INIT_LIST_HEAD(&imp->imp_pinger_chain);
1010        INIT_LIST_HEAD(&imp->imp_zombie_chain);
1011        INIT_LIST_HEAD(&imp->imp_replay_list);
1012        INIT_LIST_HEAD(&imp->imp_sending_list);
1013        INIT_LIST_HEAD(&imp->imp_delayed_list);
1014        spin_lock_init(&imp->imp_lock);
1015        imp->imp_last_success_conn = 0;
1016        imp->imp_state = LUSTRE_IMP_NEW;
1017        imp->imp_obd = class_incref(obd, "import", imp);
1018        mutex_init(&imp->imp_sec_mutex);
1019        init_waitqueue_head(&imp->imp_recovery_waitq);
1020
1021        atomic_set(&imp->imp_refcount, 2);
1022        atomic_set(&imp->imp_unregistering, 0);
1023        atomic_set(&imp->imp_inflight, 0);
1024        atomic_set(&imp->imp_replay_inflight, 0);
1025        atomic_set(&imp->imp_inval_count, 0);
1026        INIT_LIST_HEAD(&imp->imp_conn_list);
1027        INIT_LIST_HEAD(&imp->imp_handle.h_link);
1028        class_handle_hash(&imp->imp_handle, &import_handle_ops);
1029        init_imp_at(&imp->imp_at);
1030
1031        /* the default magic is V2, will be used in connect RPC, and
1032         * then adjusted according to the flags in request/reply. */
1033        imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1034
1035        return imp;
1036}
1037EXPORT_SYMBOL(class_new_import);
1038
1039void class_destroy_import(struct obd_import *import)
1040{
1041        LASSERT(import != NULL);
1042        LASSERT(import != LP_POISON);
1043
1044        class_handle_unhash(&import->imp_handle);
1045
1046        spin_lock(&import->imp_lock);
1047        import->imp_generation++;
1048        spin_unlock(&import->imp_lock);
1049        class_import_put(import);
1050}
1051EXPORT_SYMBOL(class_destroy_import);
1052
1053#if LUSTRE_TRACKS_LOCK_EXP_REFS
1054
1055void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056{
1057        spin_lock(&exp->exp_locks_list_guard);
1058
1059        LASSERT(lock->l_exp_refs_nr >= 0);
1060
1061        if (lock->l_exp_refs_target != NULL &&
1062            lock->l_exp_refs_target != exp) {
1063                LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1064                              exp, lock, lock->l_exp_refs_target);
1065        }
1066        if ((lock->l_exp_refs_nr ++) == 0) {
1067                list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1068                lock->l_exp_refs_target = exp;
1069        }
1070        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071               lock, exp, lock->l_exp_refs_nr);
1072        spin_unlock(&exp->exp_locks_list_guard);
1073}
1074EXPORT_SYMBOL(__class_export_add_lock_ref);
1075
1076void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1077{
1078        spin_lock(&exp->exp_locks_list_guard);
1079        LASSERT(lock->l_exp_refs_nr > 0);
1080        if (lock->l_exp_refs_target != exp) {
1081                LCONSOLE_WARN("lock %p, "
1082                              "mismatching export pointers: %p, %p\n",
1083                              lock, lock->l_exp_refs_target, exp);
1084        }
1085        if (-- lock->l_exp_refs_nr == 0) {
1086                list_del_init(&lock->l_exp_refs_link);
1087                lock->l_exp_refs_target = NULL;
1088        }
1089        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090               lock, exp, lock->l_exp_refs_nr);
1091        spin_unlock(&exp->exp_locks_list_guard);
1092}
1093EXPORT_SYMBOL(__class_export_del_lock_ref);
1094#endif
1095
1096/* A connection defines an export context in which preallocation can
1097   be managed. This releases the export pointer reference, and returns
1098   the export handle, so the export refcount is 1 when this function
1099   returns. */
1100int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1101                  struct obd_uuid *cluuid)
1102{
1103        struct obd_export *export;
1104        LASSERT(conn != NULL);
1105        LASSERT(obd != NULL);
1106        LASSERT(cluuid != NULL);
1107
1108        export = class_new_export(obd, cluuid);
1109        if (IS_ERR(export))
1110                return PTR_ERR(export);
1111
1112        conn->cookie = export->exp_handle.h_cookie;
1113        class_export_put(export);
1114
1115        CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1116               cluuid->uuid, conn->cookie);
1117        return 0;
1118}
1119EXPORT_SYMBOL(class_connect);
1120
1121/* if export is involved in recovery then clean up related things */
1122void class_export_recovery_cleanup(struct obd_export *exp)
1123{
1124        struct obd_device *obd = exp->exp_obd;
1125
1126        spin_lock(&obd->obd_recovery_task_lock);
1127        if (exp->exp_delayed)
1128                obd->obd_delayed_clients--;
1129        if (obd->obd_recovering) {
1130                if (exp->exp_in_recovery) {
1131                        spin_lock(&exp->exp_lock);
1132                        exp->exp_in_recovery = 0;
1133                        spin_unlock(&exp->exp_lock);
1134                        LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1135                        atomic_dec(&obd->obd_connected_clients);
1136                }
1137
1138                /* if called during recovery then should update
1139                 * obd_stale_clients counter,
1140                 * lightweight exports are not counted */
1141                if (exp->exp_failed &&
1142                    (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1143                        exp->exp_obd->obd_stale_clients++;
1144        }
1145        spin_unlock(&obd->obd_recovery_task_lock);
1146        /** Cleanup req replay fields */
1147        if (exp->exp_req_replay_needed) {
1148                spin_lock(&exp->exp_lock);
1149                exp->exp_req_replay_needed = 0;
1150                spin_unlock(&exp->exp_lock);
1151                LASSERT(atomic_read(&obd->obd_req_replay_clients));
1152                atomic_dec(&obd->obd_req_replay_clients);
1153        }
1154        /** Cleanup lock replay data */
1155        if (exp->exp_lock_replay_needed) {
1156                spin_lock(&exp->exp_lock);
1157                exp->exp_lock_replay_needed = 0;
1158                spin_unlock(&exp->exp_lock);
1159                LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1160                atomic_dec(&obd->obd_lock_replay_clients);
1161        }
1162}
1163
1164/* This function removes 1-3 references from the export:
1165 * 1 - for export pointer passed
1166 * and if disconnect really need
1167 * 2 - removing from hash
1168 * 3 - in client_unlink_export
1169 * The export pointer passed to this function can destroyed */
1170int class_disconnect(struct obd_export *export)
1171{
1172        int already_disconnected;
1173
1174        if (export == NULL) {
1175                CWARN("attempting to free NULL export %p\n", export);
1176                return -EINVAL;
1177        }
1178
1179        spin_lock(&export->exp_lock);
1180        already_disconnected = export->exp_disconnected;
1181        export->exp_disconnected = 1;
1182        spin_unlock(&export->exp_lock);
1183
1184        /* class_cleanup(), abort_recovery(), and class_fail_export()
1185         * all end up in here, and if any of them race we shouldn't
1186         * call extra class_export_puts(). */
1187        if (already_disconnected) {
1188                LASSERT(hlist_unhashed(&export->exp_nid_hash));
1189                GOTO(no_disconn, already_disconnected);
1190        }
1191
1192        CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1193               export->exp_handle.h_cookie);
1194
1195        if (!hlist_unhashed(&export->exp_nid_hash))
1196                cfs_hash_del(export->exp_obd->obd_nid_hash,
1197                             &export->exp_connection->c_peer.nid,
1198                             &export->exp_nid_hash);
1199
1200        class_export_recovery_cleanup(export);
1201        class_unlink_export(export);
1202no_disconn:
1203        class_export_put(export);
1204        return 0;
1205}
1206EXPORT_SYMBOL(class_disconnect);
1207
1208/* Return non-zero for a fully connected export */
1209int class_connected_export(struct obd_export *exp)
1210{
1211        if (exp) {
1212                int connected;
1213                spin_lock(&exp->exp_lock);
1214                connected = (exp->exp_conn_cnt > 0);
1215                spin_unlock(&exp->exp_lock);
1216                return connected;
1217        }
1218        return 0;
1219}
1220EXPORT_SYMBOL(class_connected_export);
1221
1222static void class_disconnect_export_list(struct list_head *list,
1223                                         enum obd_option flags)
1224{
1225        int rc;
1226        struct obd_export *exp;
1227
1228        /* It's possible that an export may disconnect itself, but
1229         * nothing else will be added to this list. */
1230        while (!list_empty(list)) {
1231                exp = list_entry(list->next, struct obd_export,
1232                                     exp_obd_chain);
1233                /* need for safe call CDEBUG after obd_disconnect */
1234                class_export_get(exp);
1235
1236                spin_lock(&exp->exp_lock);
1237                exp->exp_flags = flags;
1238                spin_unlock(&exp->exp_lock);
1239
1240                if (obd_uuid_equals(&exp->exp_client_uuid,
1241                                    &exp->exp_obd->obd_uuid)) {
1242                        CDEBUG(D_HA,
1243                               "exp %p export uuid == obd uuid, don't discon\n",
1244                               exp);
1245                        /* Need to delete this now so we don't end up pointing
1246                         * to work_list later when this export is cleaned up. */
1247                        list_del_init(&exp->exp_obd_chain);
1248                        class_export_put(exp);
1249                        continue;
1250                }
1251
1252                class_export_get(exp);
1253                CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1254                       "last request at "CFS_TIME_T"\n",
1255                       exp->exp_obd->obd_name, obd_export_nid2str(exp),
1256                       exp, exp->exp_last_request_time);
1257                /* release one export reference anyway */
1258                rc = obd_disconnect(exp);
1259
1260                CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1261                       obd_export_nid2str(exp), exp, rc);
1262                class_export_put(exp);
1263        }
1264}
1265
1266void class_disconnect_exports(struct obd_device *obd)
1267{
1268        struct list_head work_list;
1269
1270        /* Move all of the exports from obd_exports to a work list, en masse. */
1271        INIT_LIST_HEAD(&work_list);
1272        spin_lock(&obd->obd_dev_lock);
1273        list_splice_init(&obd->obd_exports, &work_list);
1274        list_splice_init(&obd->obd_delayed_exports, &work_list);
1275        spin_unlock(&obd->obd_dev_lock);
1276
1277        if (!list_empty(&work_list)) {
1278                CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1279                       "disconnecting them\n", obd->obd_minor, obd);
1280                class_disconnect_export_list(&work_list,
1281                                             exp_flags_from_obd(obd));
1282        } else
1283                CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1284                       obd->obd_minor, obd);
1285}
1286EXPORT_SYMBOL(class_disconnect_exports);
1287
1288/* Remove exports that have not completed recovery.
1289 */
1290void class_disconnect_stale_exports(struct obd_device *obd,
1291                                    int (*test_export)(struct obd_export *))
1292{
1293        struct list_head work_list;
1294        struct obd_export *exp, *n;
1295        int evicted = 0;
1296
1297        INIT_LIST_HEAD(&work_list);
1298        spin_lock(&obd->obd_dev_lock);
1299        list_for_each_entry_safe(exp, n, &obd->obd_exports,
1300                                     exp_obd_chain) {
1301                /* don't count self-export as client */
1302                if (obd_uuid_equals(&exp->exp_client_uuid,
1303                                    &exp->exp_obd->obd_uuid))
1304                        continue;
1305
1306                /* don't evict clients which have no slot in last_rcvd
1307                 * (e.g. lightweight connection) */
1308                if (exp->exp_target_data.ted_lr_idx == -1)
1309                        continue;
1310
1311                spin_lock(&exp->exp_lock);
1312                if (exp->exp_failed || test_export(exp)) {
1313                        spin_unlock(&exp->exp_lock);
1314                        continue;
1315                }
1316                exp->exp_failed = 1;
1317                spin_unlock(&exp->exp_lock);
1318
1319                list_move(&exp->exp_obd_chain, &work_list);
1320                evicted++;
1321                CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1322                       obd->obd_name, exp->exp_client_uuid.uuid,
1323                       exp->exp_connection == NULL ? "<unknown>" :
1324                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1325                print_export_data(exp, "EVICTING", 0);
1326        }
1327        spin_unlock(&obd->obd_dev_lock);
1328
1329        if (evicted)
1330                LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1331                              obd->obd_name, evicted);
1332
1333        class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1334                                                 OBD_OPT_ABORT_RECOV);
1335}
1336EXPORT_SYMBOL(class_disconnect_stale_exports);
1337
1338void class_fail_export(struct obd_export *exp)
1339{
1340        int rc, already_failed;
1341
1342        spin_lock(&exp->exp_lock);
1343        already_failed = exp->exp_failed;
1344        exp->exp_failed = 1;
1345        spin_unlock(&exp->exp_lock);
1346
1347        if (already_failed) {
1348                CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1349                       exp, exp->exp_client_uuid.uuid);
1350                return;
1351        }
1352
1353        CDEBUG(D_HA, "disconnecting export %p/%s\n",
1354               exp, exp->exp_client_uuid.uuid);
1355
1356        if (obd_dump_on_timeout)
1357                libcfs_debug_dumplog();
1358
1359        /* need for safe call CDEBUG after obd_disconnect */
1360        class_export_get(exp);
1361
1362        /* Most callers into obd_disconnect are removing their own reference
1363         * (request, for example) in addition to the one from the hash table.
1364         * We don't have such a reference here, so make one. */
1365        class_export_get(exp);
1366        rc = obd_disconnect(exp);
1367        if (rc)
1368                CERROR("disconnecting export %p failed: %d\n", exp, rc);
1369        else
1370                CDEBUG(D_HA, "disconnected export %p/%s\n",
1371                       exp, exp->exp_client_uuid.uuid);
1372        class_export_put(exp);
1373}
1374EXPORT_SYMBOL(class_fail_export);
1375
1376char *obd_export_nid2str(struct obd_export *exp)
1377{
1378        if (exp->exp_connection != NULL)
1379                return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1380
1381        return "(no nid)";
1382}
1383EXPORT_SYMBOL(obd_export_nid2str);
1384
1385int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1386{
1387        struct cfs_hash *nid_hash;
1388        struct obd_export *doomed_exp = NULL;
1389        int exports_evicted = 0;
1390
1391        lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1392
1393        spin_lock(&obd->obd_dev_lock);
1394        /* umount has run already, so evict thread should leave
1395         * its task to umount thread now */
1396        if (obd->obd_stopping) {
1397                spin_unlock(&obd->obd_dev_lock);
1398                return exports_evicted;
1399        }
1400        nid_hash = obd->obd_nid_hash;
1401        cfs_hash_getref(nid_hash);
1402        spin_unlock(&obd->obd_dev_lock);
1403
1404        do {
1405                doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1406                if (doomed_exp == NULL)
1407                        break;
1408
1409                LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1410                         "nid %s found, wanted nid %s, requested nid %s\n",
1411                         obd_export_nid2str(doomed_exp),
1412                         libcfs_nid2str(nid_key), nid);
1413                LASSERTF(doomed_exp != obd->obd_self_export,
1414                         "self-export is hashed by NID?\n");
1415                exports_evicted++;
1416                LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1417                              "request\n", obd->obd_name,
1418                              obd_uuid2str(&doomed_exp->exp_client_uuid),
1419                              obd_export_nid2str(doomed_exp));
1420                class_fail_export(doomed_exp);
1421                class_export_put(doomed_exp);
1422        } while (1);
1423
1424        cfs_hash_putref(nid_hash);
1425
1426        if (!exports_evicted)
1427                CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1428                       obd->obd_name, nid);
1429        return exports_evicted;
1430}
1431EXPORT_SYMBOL(obd_export_evict_by_nid);
1432
1433int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1434{
1435        struct cfs_hash *uuid_hash;
1436        struct obd_export *doomed_exp = NULL;
1437        struct obd_uuid doomed_uuid;
1438        int exports_evicted = 0;
1439
1440        spin_lock(&obd->obd_dev_lock);
1441        if (obd->obd_stopping) {
1442                spin_unlock(&obd->obd_dev_lock);
1443                return exports_evicted;
1444        }
1445        uuid_hash = obd->obd_uuid_hash;
1446        cfs_hash_getref(uuid_hash);
1447        spin_unlock(&obd->obd_dev_lock);
1448
1449        obd_str2uuid(&doomed_uuid, uuid);
1450        if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1451                CERROR("%s: can't evict myself\n", obd->obd_name);
1452                cfs_hash_putref(uuid_hash);
1453                return exports_evicted;
1454        }
1455
1456        doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1457
1458        if (doomed_exp == NULL) {
1459                CERROR("%s: can't disconnect %s: no exports found\n",
1460                       obd->obd_name, uuid);
1461        } else {
1462                CWARN("%s: evicting %s at administrative request\n",
1463                       obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1464                class_fail_export(doomed_exp);
1465                class_export_put(doomed_exp);
1466                exports_evicted++;
1467        }
1468        cfs_hash_putref(uuid_hash);
1469
1470        return exports_evicted;
1471}
1472EXPORT_SYMBOL(obd_export_evict_by_uuid);
1473
1474#if LUSTRE_TRACKS_LOCK_EXP_REFS
1475void (*class_export_dump_hook)(struct obd_export*) = NULL;
1476EXPORT_SYMBOL(class_export_dump_hook);
1477#endif
1478
1479static void print_export_data(struct obd_export *exp, const char *status,
1480                              int locks)
1481{
1482        struct ptlrpc_reply_state *rs;
1483        struct ptlrpc_reply_state *first_reply = NULL;
1484        int nreplies = 0;
1485
1486        spin_lock(&exp->exp_lock);
1487        list_for_each_entry(rs, &exp->exp_outstanding_replies,
1488                                rs_exp_list) {
1489                if (nreplies == 0)
1490                        first_reply = rs;
1491                nreplies++;
1492        }
1493        spin_unlock(&exp->exp_lock);
1494
1495        CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1496               exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1497               obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1498               atomic_read(&exp->exp_rpc_count),
1499               atomic_read(&exp->exp_cb_count),
1500               atomic_read(&exp->exp_locks_count),
1501               exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1502               nreplies, first_reply, nreplies > 3 ? "..." : "",
1503               exp->exp_last_committed);
1504#if LUSTRE_TRACKS_LOCK_EXP_REFS
1505        if (locks && class_export_dump_hook != NULL)
1506                class_export_dump_hook(exp);
1507#endif
1508}
1509
1510void dump_exports(struct obd_device *obd, int locks)
1511{
1512        struct obd_export *exp;
1513
1514        spin_lock(&obd->obd_dev_lock);
1515        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1516                print_export_data(exp, "ACTIVE", locks);
1517        list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1518                print_export_data(exp, "UNLINKED", locks);
1519        list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1520                print_export_data(exp, "DELAYED", locks);
1521        spin_unlock(&obd->obd_dev_lock);
1522        spin_lock(&obd_zombie_impexp_lock);
1523        list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1524                print_export_data(exp, "ZOMBIE", locks);
1525        spin_unlock(&obd_zombie_impexp_lock);
1526}
1527EXPORT_SYMBOL(dump_exports);
1528
1529void obd_exports_barrier(struct obd_device *obd)
1530{
1531        int waited = 2;
1532        LASSERT(list_empty(&obd->obd_exports));
1533        spin_lock(&obd->obd_dev_lock);
1534        while (!list_empty(&obd->obd_unlinked_exports)) {
1535                spin_unlock(&obd->obd_dev_lock);
1536                schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1537                                                   cfs_time_seconds(waited));
1538                if (waited > 5 && IS_PO2(waited)) {
1539                        LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1540                                      "more than %d seconds. "
1541                                      "The obd refcount = %d. Is it stuck?\n",
1542                                      obd->obd_name, waited,
1543                                      atomic_read(&obd->obd_refcount));
1544                        dump_exports(obd, 1);
1545                }
1546                waited *= 2;
1547                spin_lock(&obd->obd_dev_lock);
1548        }
1549        spin_unlock(&obd->obd_dev_lock);
1550}
1551EXPORT_SYMBOL(obd_exports_barrier);
1552
1553/* Total amount of zombies to be destroyed */
1554static int zombies_count = 0;
1555
1556/**
1557 * kill zombie imports and exports
1558 */
1559void obd_zombie_impexp_cull(void)
1560{
1561        struct obd_import *import;
1562        struct obd_export *export;
1563
1564        do {
1565                spin_lock(&obd_zombie_impexp_lock);
1566
1567                import = NULL;
1568                if (!list_empty(&obd_zombie_imports)) {
1569                        import = list_entry(obd_zombie_imports.next,
1570                                                struct obd_import,
1571                                                imp_zombie_chain);
1572                        list_del_init(&import->imp_zombie_chain);
1573                }
1574
1575                export = NULL;
1576                if (!list_empty(&obd_zombie_exports)) {
1577                        export = list_entry(obd_zombie_exports.next,
1578                                                struct obd_export,
1579                                                exp_obd_chain);
1580                        list_del_init(&export->exp_obd_chain);
1581                }
1582
1583                spin_unlock(&obd_zombie_impexp_lock);
1584
1585                if (import != NULL) {
1586                        class_import_destroy(import);
1587                        spin_lock(&obd_zombie_impexp_lock);
1588                        zombies_count--;
1589                        spin_unlock(&obd_zombie_impexp_lock);
1590                }
1591
1592                if (export != NULL) {
1593                        class_export_destroy(export);
1594                        spin_lock(&obd_zombie_impexp_lock);
1595                        zombies_count--;
1596                        spin_unlock(&obd_zombie_impexp_lock);
1597                }
1598
1599                cond_resched();
1600        } while (import != NULL || export != NULL);
1601}
1602
1603static struct completion        obd_zombie_start;
1604static struct completion        obd_zombie_stop;
1605static unsigned long            obd_zombie_flags;
1606static wait_queue_head_t                obd_zombie_waitq;
1607static pid_t                    obd_zombie_pid;
1608
1609enum {
1610        OBD_ZOMBIE_STOP         = 0x0001,
1611};
1612
1613/**
1614 * check for work for kill zombie import/export thread.
1615 */
1616static int obd_zombie_impexp_check(void *arg)
1617{
1618        int rc;
1619
1620        spin_lock(&obd_zombie_impexp_lock);
1621        rc = (zombies_count == 0) &&
1622             !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1623        spin_unlock(&obd_zombie_impexp_lock);
1624
1625        return rc;
1626}
1627
1628/**
1629 * Add export to the obd_zombe thread and notify it.
1630 */
1631static void obd_zombie_export_add(struct obd_export *exp) {
1632        spin_lock(&exp->exp_obd->obd_dev_lock);
1633        LASSERT(!list_empty(&exp->exp_obd_chain));
1634        list_del_init(&exp->exp_obd_chain);
1635        spin_unlock(&exp->exp_obd->obd_dev_lock);
1636        spin_lock(&obd_zombie_impexp_lock);
1637        zombies_count++;
1638        list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1639        spin_unlock(&obd_zombie_impexp_lock);
1640
1641        obd_zombie_impexp_notify();
1642}
1643
1644/**
1645 * Add import to the obd_zombe thread and notify it.
1646 */
1647static void obd_zombie_import_add(struct obd_import *imp) {
1648        LASSERT(imp->imp_sec == NULL);
1649        LASSERT(imp->imp_rq_pool == NULL);
1650        spin_lock(&obd_zombie_impexp_lock);
1651        LASSERT(list_empty(&imp->imp_zombie_chain));
1652        zombies_count++;
1653        list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1654        spin_unlock(&obd_zombie_impexp_lock);
1655
1656        obd_zombie_impexp_notify();
1657}
1658
1659/**
1660 * notify import/export destroy thread about new zombie.
1661 */
1662static void obd_zombie_impexp_notify(void)
1663{
1664        /*
1665         * Make sure obd_zomebie_impexp_thread get this notification.
1666         * It is possible this signal only get by obd_zombie_barrier, and
1667         * barrier gulps this notification and sleeps away and hangs ensues
1668         */
1669        wake_up_all(&obd_zombie_waitq);
1670}
1671
1672/**
1673 * check whether obd_zombie is idle
1674 */
1675static int obd_zombie_is_idle(void)
1676{
1677        int rc;
1678
1679        LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1680        spin_lock(&obd_zombie_impexp_lock);
1681        rc = (zombies_count == 0);
1682        spin_unlock(&obd_zombie_impexp_lock);
1683        return rc;
1684}
1685
1686/**
1687 * wait when obd_zombie import/export queues become empty
1688 */
1689void obd_zombie_barrier(void)
1690{
1691        struct l_wait_info lwi = { 0 };
1692
1693        if (obd_zombie_pid == current_pid())
1694                /* don't wait for myself */
1695                return;
1696        l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1697}
1698EXPORT_SYMBOL(obd_zombie_barrier);
1699
1700
1701/**
1702 * destroy zombie export/import thread.
1703 */
1704static int obd_zombie_impexp_thread(void *unused)
1705{
1706        unshare_fs_struct();
1707        complete(&obd_zombie_start);
1708
1709        obd_zombie_pid = current_pid();
1710
1711        while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1712                struct l_wait_info lwi = { 0 };
1713
1714                l_wait_event(obd_zombie_waitq,
1715                             !obd_zombie_impexp_check(NULL), &lwi);
1716                obd_zombie_impexp_cull();
1717
1718                /*
1719                 * Notify obd_zombie_barrier callers that queues
1720                 * may be empty.
1721                 */
1722                wake_up(&obd_zombie_waitq);
1723        }
1724
1725        complete(&obd_zombie_stop);
1726
1727        return 0;
1728}
1729
1730
1731/**
1732 * start destroy zombie import/export thread
1733 */
1734int obd_zombie_impexp_init(void)
1735{
1736        struct task_struct *task;
1737
1738        INIT_LIST_HEAD(&obd_zombie_imports);
1739        INIT_LIST_HEAD(&obd_zombie_exports);
1740        spin_lock_init(&obd_zombie_impexp_lock);
1741        init_completion(&obd_zombie_start);
1742        init_completion(&obd_zombie_stop);
1743        init_waitqueue_head(&obd_zombie_waitq);
1744        obd_zombie_pid = 0;
1745
1746        task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1747        if (IS_ERR(task))
1748                return PTR_ERR(task);
1749
1750        wait_for_completion(&obd_zombie_start);
1751        return 0;
1752}
1753/**
1754 * stop destroy zombie import/export thread
1755 */
1756void obd_zombie_impexp_stop(void)
1757{
1758        set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1759        obd_zombie_impexp_notify();
1760        wait_for_completion(&obd_zombie_stop);
1761}
1762
1763/***** Kernel-userspace comm helpers *******/
1764
1765/* Get length of entire message, including header */
1766int kuc_len(int payload_len)
1767{
1768        return sizeof(struct kuc_hdr) + payload_len;
1769}
1770EXPORT_SYMBOL(kuc_len);
1771
1772/* Get a pointer to kuc header, given a ptr to the payload
1773 * @param p Pointer to payload area
1774 * @returns Pointer to kuc header
1775 */
1776struct kuc_hdr * kuc_ptr(void *p)
1777{
1778        struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1779        LASSERT(lh->kuc_magic == KUC_MAGIC);
1780        return lh;
1781}
1782EXPORT_SYMBOL(kuc_ptr);
1783
1784/* Test if payload is part of kuc message
1785 * @param p Pointer to payload area
1786 * @returns boolean
1787 */
1788int kuc_ispayload(void *p)
1789{
1790        struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1791
1792        if (kh->kuc_magic == KUC_MAGIC)
1793                return 1;
1794        else
1795                return 0;
1796}
1797EXPORT_SYMBOL(kuc_ispayload);
1798
1799/* Alloc space for a message, and fill in header
1800 * @return Pointer to payload area
1801 */
1802void *kuc_alloc(int payload_len, int transport, int type)
1803{
1804        struct kuc_hdr *lh;
1805        int len = kuc_len(payload_len);
1806
1807        OBD_ALLOC(lh, len);
1808        if (lh == NULL)
1809                return ERR_PTR(-ENOMEM);
1810
1811        lh->kuc_magic = KUC_MAGIC;
1812        lh->kuc_transport = transport;
1813        lh->kuc_msgtype = type;
1814        lh->kuc_msglen = len;
1815
1816        return (void *)(lh + 1);
1817}
1818EXPORT_SYMBOL(kuc_alloc);
1819
1820/* Takes pointer to payload area */
1821inline void kuc_free(void *p, int payload_len)
1822{
1823        struct kuc_hdr *lh = kuc_ptr(p);
1824        OBD_FREE(lh, kuc_len(payload_len));
1825}
1826EXPORT_SYMBOL(kuc_free);
1827