linux/drivers/staging/lustre/lustre/obdclass/genops.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * lustre/obdclass/genops.c
  33 *
  34 * These are the only exported functions, they provide some generic
  35 * infrastructure for managing object devices
  36 */
  37
  38#define DEBUG_SUBSYSTEM S_CLASS
  39#include "../include/obd_class.h"
  40#include "../include/lprocfs_status.h"
  41#include "../include/lustre_kernelcomm.h"
  42
  43spinlock_t obd_types_lock;
  44
  45static struct kmem_cache *obd_device_cachep;
  46struct kmem_cache *obdo_cachep;
  47EXPORT_SYMBOL(obdo_cachep);
  48static struct kmem_cache *import_cachep;
  49
  50static struct list_head      obd_zombie_imports;
  51static struct list_head      obd_zombie_exports;
  52static spinlock_t  obd_zombie_impexp_lock;
  53static void obd_zombie_impexp_notify(void);
  54static void obd_zombie_export_add(struct obd_export *exp);
  55static void obd_zombie_import_add(struct obd_import *imp);
  56
  57int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
  58EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
  59
  60/*
  61 * support functions: we could use inter-module communication, but this
  62 * is more portable to other OS's
  63 */
  64static struct obd_device *obd_device_alloc(void)
  65{
  66        struct obd_device *obd;
  67
  68        obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
  69        if (obd)
  70                obd->obd_magic = OBD_DEVICE_MAGIC;
  71        return obd;
  72}
  73
  74static void obd_device_free(struct obd_device *obd)
  75{
  76        LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
  77                 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
  78        if (obd->obd_namespace) {
  79                CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
  80                       obd, obd->obd_namespace, obd->obd_force);
  81                LBUG();
  82        }
  83        lu_ref_fini(&obd->obd_reference);
  84        kmem_cache_free(obd_device_cachep, obd);
  85}
  86
  87static struct obd_type *class_search_type(const char *name)
  88{
  89        struct list_head *tmp;
  90        struct obd_type *type;
  91
  92        spin_lock(&obd_types_lock);
  93        list_for_each(tmp, &obd_types) {
  94                type = list_entry(tmp, struct obd_type, typ_chain);
  95                if (strcmp(type->typ_name, name) == 0) {
  96                        spin_unlock(&obd_types_lock);
  97                        return type;
  98                }
  99        }
 100        spin_unlock(&obd_types_lock);
 101        return NULL;
 102}
 103
 104static struct obd_type *class_get_type(const char *name)
 105{
 106        struct obd_type *type = class_search_type(name);
 107
 108        if (!type) {
 109                const char *modname = name;
 110
 111                if (!request_module("%s", modname)) {
 112                        CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
 113                        type = class_search_type(name);
 114                } else {
 115                        LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
 116                                           modname);
 117                }
 118        }
 119        if (type) {
 120                spin_lock(&type->obd_type_lock);
 121                type->typ_refcnt++;
 122                try_module_get(type->typ_dt_ops->owner);
 123                spin_unlock(&type->obd_type_lock);
 124        }
 125        return type;
 126}
 127
 128void class_put_type(struct obd_type *type)
 129{
 130        LASSERT(type);
 131        spin_lock(&type->obd_type_lock);
 132        type->typ_refcnt--;
 133        module_put(type->typ_dt_ops->owner);
 134        spin_unlock(&type->obd_type_lock);
 135}
 136EXPORT_SYMBOL(class_put_type);
 137
 138#define CLASS_MAX_NAME 1024
 139
 140int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
 141                        const char *name,
 142                        struct lu_device_type *ldt)
 143{
 144        struct obd_type *type;
 145        int rc;
 146
 147        /* sanity check */
 148        LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
 149
 150        if (class_search_type(name)) {
 151                CDEBUG(D_IOCTL, "Type %s already registered\n", name);
 152                return -EEXIST;
 153        }
 154
 155        rc = -ENOMEM;
 156        type = kzalloc(sizeof(*type), GFP_NOFS);
 157        if (!type)
 158                return rc;
 159
 160        type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
 161        type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
 162        type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
 163
 164        if (!type->typ_dt_ops ||
 165            !type->typ_md_ops ||
 166            !type->typ_name)
 167                goto failed;
 168
 169        *(type->typ_dt_ops) = *dt_ops;
 170        /* md_ops is optional */
 171        if (md_ops)
 172                *(type->typ_md_ops) = *md_ops;
 173        strcpy(type->typ_name, name);
 174        spin_lock_init(&type->obd_type_lock);
 175
 176        type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
 177                                                    debugfs_lustre_root,
 178                                                    NULL, type);
 179        if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
 180                rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
 181                                             : -ENOMEM;
 182                type->typ_debugfs_entry = NULL;
 183                goto failed;
 184        }
 185
 186        type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
 187        if (!type->typ_kobj) {
 188                rc = -ENOMEM;
 189                goto failed;
 190        }
 191
 192        if (ldt) {
 193                type->typ_lu = ldt;
 194                rc = lu_device_type_init(ldt);
 195                if (rc != 0)
 196                        goto failed;
 197        }
 198
 199        spin_lock(&obd_types_lock);
 200        list_add(&type->typ_chain, &obd_types);
 201        spin_unlock(&obd_types_lock);
 202
 203        return 0;
 204
 205 failed:
 206        if (type->typ_kobj)
 207                kobject_put(type->typ_kobj);
 208        kfree(type->typ_name);
 209        kfree(type->typ_md_ops);
 210        kfree(type->typ_dt_ops);
 211        kfree(type);
 212        return rc;
 213}
 214EXPORT_SYMBOL(class_register_type);
 215
 216int class_unregister_type(const char *name)
 217{
 218        struct obd_type *type = class_search_type(name);
 219
 220        if (!type) {
 221                CERROR("unknown obd type\n");
 222                return -EINVAL;
 223        }
 224
 225        if (type->typ_refcnt) {
 226                CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
 227                /* This is a bad situation, let's make the best of it */
 228                /* Remove ops, but leave the name for debugging */
 229                kfree(type->typ_dt_ops);
 230                kfree(type->typ_md_ops);
 231                return -EBUSY;
 232        }
 233
 234        if (type->typ_kobj)
 235                kobject_put(type->typ_kobj);
 236
 237        if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
 238                ldebugfs_remove(&type->typ_debugfs_entry);
 239
 240        if (type->typ_lu)
 241                lu_device_type_fini(type->typ_lu);
 242
 243        spin_lock(&obd_types_lock);
 244        list_del(&type->typ_chain);
 245        spin_unlock(&obd_types_lock);
 246        kfree(type->typ_name);
 247        kfree(type->typ_dt_ops);
 248        kfree(type->typ_md_ops);
 249        kfree(type);
 250        return 0;
 251} /* class_unregister_type */
 252EXPORT_SYMBOL(class_unregister_type);
 253
 254/**
 255 * Create a new obd device.
 256 *
 257 * Find an empty slot in ::obd_devs[], create a new obd device in it.
 258 *
 259 * \param[in] type_name obd device type string.
 260 * \param[in] name      obd device name.
 261 *
 262 * \retval NULL if create fails, otherwise return the obd device
 263 *       pointer created.
 264 */
 265struct obd_device *class_newdev(const char *type_name, const char *name)
 266{
 267        struct obd_device *result = NULL;
 268        struct obd_device *newdev;
 269        struct obd_type *type = NULL;
 270        int i;
 271        int new_obd_minor = 0;
 272
 273        if (strlen(name) >= MAX_OBD_NAME) {
 274                CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
 275                return ERR_PTR(-EINVAL);
 276        }
 277
 278        type = class_get_type(type_name);
 279        if (!type) {
 280                CERROR("OBD: unknown type: %s\n", type_name);
 281                return ERR_PTR(-ENODEV);
 282        }
 283
 284        newdev = obd_device_alloc();
 285        if (!newdev) {
 286                result = ERR_PTR(-ENOMEM);
 287                goto out_type;
 288        }
 289
 290        LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
 291
 292        write_lock(&obd_dev_lock);
 293        for (i = 0; i < class_devno_max(); i++) {
 294                struct obd_device *obd = class_num2obd(i);
 295
 296                if (obd && (strcmp(name, obd->obd_name) == 0)) {
 297                        CERROR("Device %s already exists at %d, won't add\n",
 298                               name, i);
 299                        if (result) {
 300                                LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
 301                                         "%p obd_magic %08x != %08x\n", result,
 302                                         result->obd_magic, OBD_DEVICE_MAGIC);
 303                                LASSERTF(result->obd_minor == new_obd_minor,
 304                                         "%p obd_minor %d != %d\n", result,
 305                                         result->obd_minor, new_obd_minor);
 306
 307                                obd_devs[result->obd_minor] = NULL;
 308                                result->obd_name[0] = '\0';
 309                         }
 310                        result = ERR_PTR(-EEXIST);
 311                        break;
 312                }
 313                if (!result && !obd) {
 314                        result = newdev;
 315                        result->obd_minor = i;
 316                        new_obd_minor = i;
 317                        result->obd_type = type;
 318                        strncpy(result->obd_name, name,
 319                                sizeof(result->obd_name) - 1);
 320                        obd_devs[i] = result;
 321                }
 322        }
 323        write_unlock(&obd_dev_lock);
 324
 325        if (!result && i >= class_devno_max()) {
 326                CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
 327                       class_devno_max());
 328                result = ERR_PTR(-EOVERFLOW);
 329                goto out;
 330        }
 331
 332        if (IS_ERR(result))
 333                goto out;
 334
 335        CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
 336               result->obd_name, result);
 337
 338        return result;
 339out:
 340        obd_device_free(newdev);
 341out_type:
 342        class_put_type(type);
 343        return result;
 344}
 345
 346void class_release_dev(struct obd_device *obd)
 347{
 348        struct obd_type *obd_type = obd->obd_type;
 349
 350        LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
 351                 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
 352        LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
 353                 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
 354        LASSERT(obd_type);
 355
 356        CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
 357               obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
 358
 359        write_lock(&obd_dev_lock);
 360        obd_devs[obd->obd_minor] = NULL;
 361        write_unlock(&obd_dev_lock);
 362        obd_device_free(obd);
 363
 364        class_put_type(obd_type);
 365}
 366
 367int class_name2dev(const char *name)
 368{
 369        int i;
 370
 371        if (!name)
 372                return -1;
 373
 374        read_lock(&obd_dev_lock);
 375        for (i = 0; i < class_devno_max(); i++) {
 376                struct obd_device *obd = class_num2obd(i);
 377
 378                if (obd && strcmp(name, obd->obd_name) == 0) {
 379                        /* Make sure we finished attaching before we give
 380                         * out any references
 381                         */
 382                        LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
 383                        if (obd->obd_attached) {
 384                                read_unlock(&obd_dev_lock);
 385                                return i;
 386                        }
 387                        break;
 388                }
 389        }
 390        read_unlock(&obd_dev_lock);
 391
 392        return -1;
 393}
 394EXPORT_SYMBOL(class_name2dev);
 395
 396struct obd_device *class_name2obd(const char *name)
 397{
 398        int dev = class_name2dev(name);
 399
 400        if (dev < 0 || dev > class_devno_max())
 401                return NULL;
 402        return class_num2obd(dev);
 403}
 404EXPORT_SYMBOL(class_name2obd);
 405
 406int class_uuid2dev(struct obd_uuid *uuid)
 407{
 408        int i;
 409
 410        read_lock(&obd_dev_lock);
 411        for (i = 0; i < class_devno_max(); i++) {
 412                struct obd_device *obd = class_num2obd(i);
 413
 414                if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
 415                        LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
 416                        read_unlock(&obd_dev_lock);
 417                        return i;
 418                }
 419        }
 420        read_unlock(&obd_dev_lock);
 421
 422        return -1;
 423}
 424EXPORT_SYMBOL(class_uuid2dev);
 425
 426/**
 427 * Get obd device from ::obd_devs[]
 428 *
 429 * \param num [in] array index
 430 *
 431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
 432 *       otherwise return the obd device there.
 433 */
 434struct obd_device *class_num2obd(int num)
 435{
 436        struct obd_device *obd = NULL;
 437
 438        if (num < class_devno_max()) {
 439                obd = obd_devs[num];
 440                if (!obd)
 441                        return NULL;
 442
 443                LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
 444                         "%p obd_magic %08x != %08x\n",
 445                         obd, obd->obd_magic, OBD_DEVICE_MAGIC);
 446                LASSERTF(obd->obd_minor == num,
 447                         "%p obd_minor %0d != %0d\n",
 448                         obd, obd->obd_minor, num);
 449        }
 450
 451        return obd;
 452}
 453EXPORT_SYMBOL(class_num2obd);
 454
 455/* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
 456 * specified, then only the client with that uuid is returned,
 457 * otherwise any client connected to the tgt is returned.
 458 */
 459struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
 460                                         const char *typ_name,
 461                                         struct obd_uuid *grp_uuid)
 462{
 463        int i;
 464
 465        read_lock(&obd_dev_lock);
 466        for (i = 0; i < class_devno_max(); i++) {
 467                struct obd_device *obd = class_num2obd(i);
 468
 469                if (!obd)
 470                        continue;
 471                if ((strncmp(obd->obd_type->typ_name, typ_name,
 472                             strlen(typ_name)) == 0)) {
 473                        if (obd_uuid_equals(tgt_uuid,
 474                                            &obd->u.cli.cl_target_uuid) &&
 475                            ((grp_uuid) ? obd_uuid_equals(grp_uuid,
 476                                                         &obd->obd_uuid) : 1)) {
 477                                read_unlock(&obd_dev_lock);
 478                                return obd;
 479                        }
 480                }
 481        }
 482        read_unlock(&obd_dev_lock);
 483
 484        return NULL;
 485}
 486EXPORT_SYMBOL(class_find_client_obd);
 487
 488/* Iterate the obd_device list looking devices have grp_uuid. Start
 489 * searching at *next, and if a device is found, the next index to look
 490 * at is saved in *next. If next is NULL, then the first matching device
 491 * will always be returned.
 492 */
 493struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
 494{
 495        int i;
 496
 497        if (!next)
 498                i = 0;
 499        else if (*next >= 0 && *next < class_devno_max())
 500                i = *next;
 501        else
 502                return NULL;
 503
 504        read_lock(&obd_dev_lock);
 505        for (; i < class_devno_max(); i++) {
 506                struct obd_device *obd = class_num2obd(i);
 507
 508                if (!obd)
 509                        continue;
 510                if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
 511                        if (next)
 512                                *next = i+1;
 513                        read_unlock(&obd_dev_lock);
 514                        return obd;
 515                }
 516        }
 517        read_unlock(&obd_dev_lock);
 518
 519        return NULL;
 520}
 521EXPORT_SYMBOL(class_devices_in_group);
 522
 523/**
 524 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
 525 * adjust sptlrpc settings accordingly.
 526 */
 527int class_notify_sptlrpc_conf(const char *fsname, int namelen)
 528{
 529        struct obd_device  *obd;
 530        const char       *type;
 531        int              i, rc = 0, rc2;
 532
 533        LASSERT(namelen > 0);
 534
 535        read_lock(&obd_dev_lock);
 536        for (i = 0; i < class_devno_max(); i++) {
 537                obd = class_num2obd(i);
 538
 539                if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
 540                        continue;
 541
 542                /* only notify mdc, osc, mdt, ost */
 543                type = obd->obd_type->typ_name;
 544                if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
 545                    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
 546                    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
 547                    strcmp(type, LUSTRE_OST_NAME) != 0)
 548                        continue;
 549
 550                if (strncmp(obd->obd_name, fsname, namelen))
 551                        continue;
 552
 553                class_incref(obd, __func__, obd);
 554                read_unlock(&obd_dev_lock);
 555                rc2 = obd_set_info_async(NULL, obd->obd_self_export,
 556                                         sizeof(KEY_SPTLRPC_CONF),
 557                                         KEY_SPTLRPC_CONF, 0, NULL, NULL);
 558                rc = rc ? rc : rc2;
 559                class_decref(obd, __func__, obd);
 560                read_lock(&obd_dev_lock);
 561        }
 562        read_unlock(&obd_dev_lock);
 563        return rc;
 564}
 565EXPORT_SYMBOL(class_notify_sptlrpc_conf);
 566
 567void obd_cleanup_caches(void)
 568{
 569        kmem_cache_destroy(obd_device_cachep);
 570        obd_device_cachep = NULL;
 571        kmem_cache_destroy(obdo_cachep);
 572        obdo_cachep = NULL;
 573        kmem_cache_destroy(import_cachep);
 574        import_cachep = NULL;
 575}
 576
 577int obd_init_caches(void)
 578{
 579        LASSERT(!obd_device_cachep);
 580        obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
 581                                              sizeof(struct obd_device),
 582                                              0, 0, NULL);
 583        if (!obd_device_cachep)
 584                goto out;
 585
 586        LASSERT(!obdo_cachep);
 587        obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
 588                                        0, 0, NULL);
 589        if (!obdo_cachep)
 590                goto out;
 591
 592        LASSERT(!import_cachep);
 593        import_cachep = kmem_cache_create("ll_import_cache",
 594                                          sizeof(struct obd_import),
 595                                          0, 0, NULL);
 596        if (!import_cachep)
 597                goto out;
 598
 599        return 0;
 600 out:
 601        obd_cleanup_caches();
 602        return -ENOMEM;
 603}
 604
 605/* map connection to client */
 606struct obd_export *class_conn2export(struct lustre_handle *conn)
 607{
 608        struct obd_export *export;
 609
 610        if (!conn) {
 611                CDEBUG(D_CACHE, "looking for null handle\n");
 612                return NULL;
 613        }
 614
 615        if (conn->cookie == -1) {  /* this means assign a new connection */
 616                CDEBUG(D_CACHE, "want a new connection\n");
 617                return NULL;
 618        }
 619
 620        CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
 621        export = class_handle2object(conn->cookie);
 622        return export;
 623}
 624EXPORT_SYMBOL(class_conn2export);
 625
 626struct obd_device *class_exp2obd(struct obd_export *exp)
 627{
 628        if (exp)
 629                return exp->exp_obd;
 630        return NULL;
 631}
 632EXPORT_SYMBOL(class_exp2obd);
 633
 634struct obd_import *class_exp2cliimp(struct obd_export *exp)
 635{
 636        struct obd_device *obd = exp->exp_obd;
 637
 638        if (!obd)
 639                return NULL;
 640        return obd->u.cli.cl_import;
 641}
 642EXPORT_SYMBOL(class_exp2cliimp);
 643
 644/* Export management functions */
 645static void class_export_destroy(struct obd_export *exp)
 646{
 647        struct obd_device *obd = exp->exp_obd;
 648
 649        LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
 650        LASSERT(obd);
 651
 652        CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
 653               exp->exp_client_uuid.uuid, obd->obd_name);
 654
 655        /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
 656        if (exp->exp_connection)
 657                ptlrpc_put_connection_superhack(exp->exp_connection);
 658
 659        LASSERT(list_empty(&exp->exp_outstanding_replies));
 660        LASSERT(list_empty(&exp->exp_uncommitted_replies));
 661        LASSERT(list_empty(&exp->exp_req_replay_queue));
 662        LASSERT(list_empty(&exp->exp_hp_rpcs));
 663        obd_destroy_export(exp);
 664        class_decref(obd, "export", exp);
 665
 666        OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
 667}
 668
 669static void export_handle_addref(void *export)
 670{
 671        class_export_get(export);
 672}
 673
 674static struct portals_handle_ops export_handle_ops = {
 675        .hop_addref = export_handle_addref,
 676        .hop_free   = NULL,
 677};
 678
 679struct obd_export *class_export_get(struct obd_export *exp)
 680{
 681        atomic_inc(&exp->exp_refcount);
 682        CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
 683               atomic_read(&exp->exp_refcount));
 684        return exp;
 685}
 686EXPORT_SYMBOL(class_export_get);
 687
 688void class_export_put(struct obd_export *exp)
 689{
 690        LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
 691        CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
 692               atomic_read(&exp->exp_refcount) - 1);
 693
 694        if (atomic_dec_and_test(&exp->exp_refcount)) {
 695                LASSERT(!list_empty(&exp->exp_obd_chain));
 696                CDEBUG(D_IOCTL, "final put %p/%s\n",
 697                       exp, exp->exp_client_uuid.uuid);
 698
 699                /* release nid stat refererence */
 700                lprocfs_exp_cleanup(exp);
 701
 702                obd_zombie_export_add(exp);
 703        }
 704}
 705EXPORT_SYMBOL(class_export_put);
 706
 707/* Creates a new export, adds it to the hash table, and returns a
 708 * pointer to it. The refcount is 2: one for the hash reference, and
 709 * one for the pointer returned by this function.
 710 */
 711struct obd_export *class_new_export(struct obd_device *obd,
 712                                    struct obd_uuid *cluuid)
 713{
 714        struct obd_export *export;
 715        struct cfs_hash *hash = NULL;
 716        int rc = 0;
 717
 718        export = kzalloc(sizeof(*export), GFP_NOFS);
 719        if (!export)
 720                return ERR_PTR(-ENOMEM);
 721
 722        export->exp_conn_cnt = 0;
 723        export->exp_lock_hash = NULL;
 724        export->exp_flock_hash = NULL;
 725        atomic_set(&export->exp_refcount, 2);
 726        atomic_set(&export->exp_rpc_count, 0);
 727        atomic_set(&export->exp_cb_count, 0);
 728        atomic_set(&export->exp_locks_count, 0);
 729#if LUSTRE_TRACKS_LOCK_EXP_REFS
 730        INIT_LIST_HEAD(&export->exp_locks_list);
 731        spin_lock_init(&export->exp_locks_list_guard);
 732#endif
 733        atomic_set(&export->exp_replay_count, 0);
 734        export->exp_obd = obd;
 735        INIT_LIST_HEAD(&export->exp_outstanding_replies);
 736        spin_lock_init(&export->exp_uncommitted_replies_lock);
 737        INIT_LIST_HEAD(&export->exp_uncommitted_replies);
 738        INIT_LIST_HEAD(&export->exp_req_replay_queue);
 739        INIT_LIST_HEAD(&export->exp_handle.h_link);
 740        INIT_LIST_HEAD(&export->exp_hp_rpcs);
 741        class_handle_hash(&export->exp_handle, &export_handle_ops);
 742        spin_lock_init(&export->exp_lock);
 743        spin_lock_init(&export->exp_rpc_lock);
 744        INIT_HLIST_NODE(&export->exp_uuid_hash);
 745        spin_lock_init(&export->exp_bl_list_lock);
 746        INIT_LIST_HEAD(&export->exp_bl_list);
 747
 748        export->exp_sp_peer = LUSTRE_SP_ANY;
 749        export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
 750        export->exp_client_uuid = *cluuid;
 751        obd_init_export(export);
 752
 753        spin_lock(&obd->obd_dev_lock);
 754        /* shouldn't happen, but might race */
 755        if (obd->obd_stopping) {
 756                rc = -ENODEV;
 757                goto exit_unlock;
 758        }
 759
 760        hash = cfs_hash_getref(obd->obd_uuid_hash);
 761        if (!hash) {
 762                rc = -ENODEV;
 763                goto exit_unlock;
 764        }
 765        spin_unlock(&obd->obd_dev_lock);
 766
 767        if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
 768                rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
 769                if (rc != 0) {
 770                        LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
 771                                      obd->obd_name, cluuid->uuid, rc);
 772                        rc = -EALREADY;
 773                        goto exit_err;
 774                }
 775        }
 776
 777        spin_lock(&obd->obd_dev_lock);
 778        if (obd->obd_stopping) {
 779                cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
 780                rc = -ENODEV;
 781                goto exit_unlock;
 782        }
 783
 784        class_incref(obd, "export", export);
 785        list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
 786        export->exp_obd->obd_num_exports++;
 787        spin_unlock(&obd->obd_dev_lock);
 788        cfs_hash_putref(hash);
 789        return export;
 790
 791exit_unlock:
 792        spin_unlock(&obd->obd_dev_lock);
 793exit_err:
 794        if (hash)
 795                cfs_hash_putref(hash);
 796        class_handle_unhash(&export->exp_handle);
 797        LASSERT(hlist_unhashed(&export->exp_uuid_hash));
 798        obd_destroy_export(export);
 799        kfree(export);
 800        return ERR_PTR(rc);
 801}
 802EXPORT_SYMBOL(class_new_export);
 803
 804void class_unlink_export(struct obd_export *exp)
 805{
 806        class_handle_unhash(&exp->exp_handle);
 807
 808        spin_lock(&exp->exp_obd->obd_dev_lock);
 809        /* delete an uuid-export hashitem from hashtables */
 810        if (!hlist_unhashed(&exp->exp_uuid_hash))
 811                cfs_hash_del(exp->exp_obd->obd_uuid_hash,
 812                             &exp->exp_client_uuid,
 813                             &exp->exp_uuid_hash);
 814
 815        list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
 816        exp->exp_obd->obd_num_exports--;
 817        spin_unlock(&exp->exp_obd->obd_dev_lock);
 818        class_export_put(exp);
 819}
 820EXPORT_SYMBOL(class_unlink_export);
 821
 822/* Import management functions */
 823static void class_import_destroy(struct obd_import *imp)
 824{
 825        CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
 826               imp->imp_obd->obd_name);
 827
 828        LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
 829
 830        ptlrpc_put_connection_superhack(imp->imp_connection);
 831
 832        while (!list_empty(&imp->imp_conn_list)) {
 833                struct obd_import_conn *imp_conn;
 834
 835                imp_conn = list_entry(imp->imp_conn_list.next,
 836                                      struct obd_import_conn, oic_item);
 837                list_del_init(&imp_conn->oic_item);
 838                ptlrpc_put_connection_superhack(imp_conn->oic_conn);
 839                kfree(imp_conn);
 840        }
 841
 842        LASSERT(!imp->imp_sec);
 843        class_decref(imp->imp_obd, "import", imp);
 844        OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
 845}
 846
 847static void import_handle_addref(void *import)
 848{
 849        class_import_get(import);
 850}
 851
 852static struct portals_handle_ops import_handle_ops = {
 853        .hop_addref = import_handle_addref,
 854        .hop_free   = NULL,
 855};
 856
 857struct obd_import *class_import_get(struct obd_import *import)
 858{
 859        atomic_inc(&import->imp_refcount);
 860        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
 861               atomic_read(&import->imp_refcount),
 862               import->imp_obd->obd_name);
 863        return import;
 864}
 865EXPORT_SYMBOL(class_import_get);
 866
 867void class_import_put(struct obd_import *imp)
 868{
 869        LASSERT(list_empty(&imp->imp_zombie_chain));
 870        LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
 871
 872        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
 873               atomic_read(&imp->imp_refcount) - 1,
 874               imp->imp_obd->obd_name);
 875
 876        if (atomic_dec_and_test(&imp->imp_refcount)) {
 877                CDEBUG(D_INFO, "final put import %p\n", imp);
 878                obd_zombie_import_add(imp);
 879        }
 880
 881        /* catch possible import put race */
 882        LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
 883}
 884EXPORT_SYMBOL(class_import_put);
 885
 886static void init_imp_at(struct imp_at *at)
 887{
 888        int i;
 889
 890        at_init(&at->iat_net_latency, 0, 0);
 891        for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
 892                /* max service estimates are tracked on the server side, so
 893                 * don't use the AT history here, just use the last reported
 894                 * val. (But keep hist for proc histogram, worst_ever)
 895                 */
 896                at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
 897                        AT_FLG_NOHIST);
 898        }
 899}
 900
 901struct obd_import *class_new_import(struct obd_device *obd)
 902{
 903        struct obd_import *imp;
 904
 905        imp = kzalloc(sizeof(*imp), GFP_NOFS);
 906        if (!imp)
 907                return NULL;
 908
 909        INIT_LIST_HEAD(&imp->imp_pinger_chain);
 910        INIT_LIST_HEAD(&imp->imp_zombie_chain);
 911        INIT_LIST_HEAD(&imp->imp_replay_list);
 912        INIT_LIST_HEAD(&imp->imp_sending_list);
 913        INIT_LIST_HEAD(&imp->imp_delayed_list);
 914        INIT_LIST_HEAD(&imp->imp_committed_list);
 915        imp->imp_replay_cursor = &imp->imp_committed_list;
 916        spin_lock_init(&imp->imp_lock);
 917        imp->imp_last_success_conn = 0;
 918        imp->imp_state = LUSTRE_IMP_NEW;
 919        imp->imp_obd = class_incref(obd, "import", imp);
 920        mutex_init(&imp->imp_sec_mutex);
 921        init_waitqueue_head(&imp->imp_recovery_waitq);
 922
 923        atomic_set(&imp->imp_refcount, 2);
 924        atomic_set(&imp->imp_unregistering, 0);
 925        atomic_set(&imp->imp_inflight, 0);
 926        atomic_set(&imp->imp_replay_inflight, 0);
 927        atomic_set(&imp->imp_inval_count, 0);
 928        INIT_LIST_HEAD(&imp->imp_conn_list);
 929        INIT_LIST_HEAD(&imp->imp_handle.h_link);
 930        class_handle_hash(&imp->imp_handle, &import_handle_ops);
 931        init_imp_at(&imp->imp_at);
 932
 933        /* the default magic is V2, will be used in connect RPC, and
 934         * then adjusted according to the flags in request/reply.
 935         */
 936        imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
 937
 938        return imp;
 939}
 940EXPORT_SYMBOL(class_new_import);
 941
 942void class_destroy_import(struct obd_import *import)
 943{
 944        LASSERT(import);
 945        LASSERT(import != LP_POISON);
 946
 947        class_handle_unhash(&import->imp_handle);
 948
 949        spin_lock(&import->imp_lock);
 950        import->imp_generation++;
 951        spin_unlock(&import->imp_lock);
 952        class_import_put(import);
 953}
 954EXPORT_SYMBOL(class_destroy_import);
 955
 956#if LUSTRE_TRACKS_LOCK_EXP_REFS
 957
 958void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
 959{
 960        spin_lock(&exp->exp_locks_list_guard);
 961
 962        LASSERT(lock->l_exp_refs_nr >= 0);
 963
 964        if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
 965                LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
 966                              exp, lock, lock->l_exp_refs_target);
 967        }
 968        if ((lock->l_exp_refs_nr++) == 0) {
 969                list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
 970                lock->l_exp_refs_target = exp;
 971        }
 972        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
 973               lock, exp, lock->l_exp_refs_nr);
 974        spin_unlock(&exp->exp_locks_list_guard);
 975}
 976EXPORT_SYMBOL(__class_export_add_lock_ref);
 977
 978void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
 979{
 980        spin_lock(&exp->exp_locks_list_guard);
 981        LASSERT(lock->l_exp_refs_nr > 0);
 982        if (lock->l_exp_refs_target != exp) {
 983                LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
 984                              lock, lock->l_exp_refs_target, exp);
 985        }
 986        if (-- lock->l_exp_refs_nr == 0) {
 987                list_del_init(&lock->l_exp_refs_link);
 988                lock->l_exp_refs_target = NULL;
 989        }
 990        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
 991               lock, exp, lock->l_exp_refs_nr);
 992        spin_unlock(&exp->exp_locks_list_guard);
 993}
 994EXPORT_SYMBOL(__class_export_del_lock_ref);
 995#endif
 996
 997/* A connection defines an export context in which preallocation can
 998 * be managed. This releases the export pointer reference, and returns
 999 * the export handle, so the export refcount is 1 when this function
1000 * returns.
1001 */
1002int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1003                  struct obd_uuid *cluuid)
1004{
1005        struct obd_export *export;
1006
1007        LASSERT(conn);
1008        LASSERT(obd);
1009        LASSERT(cluuid);
1010
1011        export = class_new_export(obd, cluuid);
1012        if (IS_ERR(export))
1013                return PTR_ERR(export);
1014
1015        conn->cookie = export->exp_handle.h_cookie;
1016        class_export_put(export);
1017
1018        CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1019               cluuid->uuid, conn->cookie);
1020        return 0;
1021}
1022EXPORT_SYMBOL(class_connect);
1023
1024/* This function removes 1-3 references from the export:
1025 * 1 - for export pointer passed
1026 * and if disconnect really need
1027 * 2 - removing from hash
1028 * 3 - in client_unlink_export
1029 * The export pointer passed to this function can destroyed
1030 */
1031int class_disconnect(struct obd_export *export)
1032{
1033        int already_disconnected;
1034
1035        if (!export) {
1036                CWARN("attempting to free NULL export %p\n", export);
1037                return -EINVAL;
1038        }
1039
1040        spin_lock(&export->exp_lock);
1041        already_disconnected = export->exp_disconnected;
1042        export->exp_disconnected = 1;
1043        spin_unlock(&export->exp_lock);
1044
1045        /* class_cleanup(), abort_recovery(), and class_fail_export()
1046         * all end up in here, and if any of them race we shouldn't
1047         * call extra class_export_puts().
1048         */
1049        if (already_disconnected)
1050                goto no_disconn;
1051
1052        CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1053               export->exp_handle.h_cookie);
1054
1055        class_unlink_export(export);
1056no_disconn:
1057        class_export_put(export);
1058        return 0;
1059}
1060EXPORT_SYMBOL(class_disconnect);
1061
1062void class_fail_export(struct obd_export *exp)
1063{
1064        int rc, already_failed;
1065
1066        spin_lock(&exp->exp_lock);
1067        already_failed = exp->exp_failed;
1068        exp->exp_failed = 1;
1069        spin_unlock(&exp->exp_lock);
1070
1071        if (already_failed) {
1072                CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1073                       exp, exp->exp_client_uuid.uuid);
1074                return;
1075        }
1076
1077        CDEBUG(D_HA, "disconnecting export %p/%s\n",
1078               exp, exp->exp_client_uuid.uuid);
1079
1080        if (obd_dump_on_timeout)
1081                libcfs_debug_dumplog();
1082
1083        /* need for safe call CDEBUG after obd_disconnect */
1084        class_export_get(exp);
1085
1086        /* Most callers into obd_disconnect are removing their own reference
1087         * (request, for example) in addition to the one from the hash table.
1088         * We don't have such a reference here, so make one.
1089         */
1090        class_export_get(exp);
1091        rc = obd_disconnect(exp);
1092        if (rc)
1093                CERROR("disconnecting export %p failed: %d\n", exp, rc);
1094        else
1095                CDEBUG(D_HA, "disconnected export %p/%s\n",
1096                       exp, exp->exp_client_uuid.uuid);
1097        class_export_put(exp);
1098}
1099EXPORT_SYMBOL(class_fail_export);
1100
1101#if LUSTRE_TRACKS_LOCK_EXP_REFS
1102void (*class_export_dump_hook)(struct obd_export *) = NULL;
1103EXPORT_SYMBOL(class_export_dump_hook);
1104#endif
1105
1106/* Total amount of zombies to be destroyed */
1107static int zombies_count;
1108
1109/**
1110 * kill zombie imports and exports
1111 */
1112static void obd_zombie_impexp_cull(void)
1113{
1114        struct obd_import *import;
1115        struct obd_export *export;
1116
1117        do {
1118                spin_lock(&obd_zombie_impexp_lock);
1119
1120                import = NULL;
1121                if (!list_empty(&obd_zombie_imports)) {
1122                        import = list_entry(obd_zombie_imports.next,
1123                                            struct obd_import,
1124                                            imp_zombie_chain);
1125                        list_del_init(&import->imp_zombie_chain);
1126                }
1127
1128                export = NULL;
1129                if (!list_empty(&obd_zombie_exports)) {
1130                        export = list_entry(obd_zombie_exports.next,
1131                                            struct obd_export,
1132                                            exp_obd_chain);
1133                        list_del_init(&export->exp_obd_chain);
1134                }
1135
1136                spin_unlock(&obd_zombie_impexp_lock);
1137
1138                if (import) {
1139                        class_import_destroy(import);
1140                        spin_lock(&obd_zombie_impexp_lock);
1141                        zombies_count--;
1142                        spin_unlock(&obd_zombie_impexp_lock);
1143                }
1144
1145                if (export) {
1146                        class_export_destroy(export);
1147                        spin_lock(&obd_zombie_impexp_lock);
1148                        zombies_count--;
1149                        spin_unlock(&obd_zombie_impexp_lock);
1150                }
1151
1152                cond_resched();
1153        } while (import || export);
1154}
1155
1156static struct completion        obd_zombie_start;
1157static struct completion        obd_zombie_stop;
1158static unsigned long            obd_zombie_flags;
1159static wait_queue_head_t                obd_zombie_waitq;
1160static pid_t                    obd_zombie_pid;
1161
1162enum {
1163        OBD_ZOMBIE_STOP         = 0x0001,
1164};
1165
1166/**
1167 * check for work for kill zombie import/export thread.
1168 */
1169static int obd_zombie_impexp_check(void *arg)
1170{
1171        int rc;
1172
1173        spin_lock(&obd_zombie_impexp_lock);
1174        rc = (zombies_count == 0) &&
1175             !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1176        spin_unlock(&obd_zombie_impexp_lock);
1177
1178        return rc;
1179}
1180
1181/**
1182 * Add export to the obd_zombie thread and notify it.
1183 */
1184static void obd_zombie_export_add(struct obd_export *exp)
1185{
1186        spin_lock(&exp->exp_obd->obd_dev_lock);
1187        LASSERT(!list_empty(&exp->exp_obd_chain));
1188        list_del_init(&exp->exp_obd_chain);
1189        spin_unlock(&exp->exp_obd->obd_dev_lock);
1190        spin_lock(&obd_zombie_impexp_lock);
1191        zombies_count++;
1192        list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1193        spin_unlock(&obd_zombie_impexp_lock);
1194
1195        obd_zombie_impexp_notify();
1196}
1197
1198/**
1199 * Add import to the obd_zombie thread and notify it.
1200 */
1201static void obd_zombie_import_add(struct obd_import *imp)
1202{
1203        LASSERT(!imp->imp_sec);
1204        spin_lock(&obd_zombie_impexp_lock);
1205        LASSERT(list_empty(&imp->imp_zombie_chain));
1206        zombies_count++;
1207        list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1208        spin_unlock(&obd_zombie_impexp_lock);
1209
1210        obd_zombie_impexp_notify();
1211}
1212
1213/**
1214 * notify import/export destroy thread about new zombie.
1215 */
1216static void obd_zombie_impexp_notify(void)
1217{
1218        /*
1219         * Make sure obd_zombie_impexp_thread get this notification.
1220         * It is possible this signal only get by obd_zombie_barrier, and
1221         * barrier gulps this notification and sleeps away and hangs ensues
1222         */
1223        wake_up_all(&obd_zombie_waitq);
1224}
1225
1226/**
1227 * check whether obd_zombie is idle
1228 */
1229static int obd_zombie_is_idle(void)
1230{
1231        int rc;
1232
1233        LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1234        spin_lock(&obd_zombie_impexp_lock);
1235        rc = (zombies_count == 0);
1236        spin_unlock(&obd_zombie_impexp_lock);
1237        return rc;
1238}
1239
1240/**
1241 * wait when obd_zombie import/export queues become empty
1242 */
1243void obd_zombie_barrier(void)
1244{
1245        struct l_wait_info lwi = { 0 };
1246
1247        if (obd_zombie_pid == current_pid())
1248                /* don't wait for myself */
1249                return;
1250        l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1251}
1252EXPORT_SYMBOL(obd_zombie_barrier);
1253
1254/**
1255 * destroy zombie export/import thread.
1256 */
1257static int obd_zombie_impexp_thread(void *unused)
1258{
1259        unshare_fs_struct();
1260        complete(&obd_zombie_start);
1261
1262        obd_zombie_pid = current_pid();
1263
1264        while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1265                struct l_wait_info lwi = { 0 };
1266
1267                l_wait_event(obd_zombie_waitq,
1268                             !obd_zombie_impexp_check(NULL), &lwi);
1269                obd_zombie_impexp_cull();
1270
1271                /*
1272                 * Notify obd_zombie_barrier callers that queues
1273                 * may be empty.
1274                 */
1275                wake_up(&obd_zombie_waitq);
1276        }
1277
1278        complete(&obd_zombie_stop);
1279
1280        return 0;
1281}
1282
1283/**
1284 * start destroy zombie import/export thread
1285 */
1286int obd_zombie_impexp_init(void)
1287{
1288        struct task_struct *task;
1289
1290        INIT_LIST_HEAD(&obd_zombie_imports);
1291        INIT_LIST_HEAD(&obd_zombie_exports);
1292        spin_lock_init(&obd_zombie_impexp_lock);
1293        init_completion(&obd_zombie_start);
1294        init_completion(&obd_zombie_stop);
1295        init_waitqueue_head(&obd_zombie_waitq);
1296        obd_zombie_pid = 0;
1297
1298        task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1299        if (IS_ERR(task))
1300                return PTR_ERR(task);
1301
1302        wait_for_completion(&obd_zombie_start);
1303        return 0;
1304}
1305
1306/**
1307 * stop destroy zombie import/export thread
1308 */
1309void obd_zombie_impexp_stop(void)
1310{
1311        set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1312        obd_zombie_impexp_notify();
1313        wait_for_completion(&obd_zombie_stop);
1314}
1315