linux/drivers/staging/lustre/lustre/obdecho/echo_client.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33#define DEBUG_SUBSYSTEM S_ECHO
  34#include "../../include/linux/libcfs/libcfs.h"
  35
  36#include "../include/obd.h"
  37#include "../include/obd_support.h"
  38#include "../include/obd_class.h"
  39#include "../include/lustre_debug.h"
  40#include "../include/lprocfs_status.h"
  41#include "../include/cl_object.h"
  42#include "../include/lustre_fid.h"
  43#include "../include/lustre_acl.h"
  44#include "../include/lustre/lustre_ioctl.h"
  45#include "../include/lustre_net.h"
  46
  47#include "echo_internal.h"
  48
  49/** \defgroup echo_client Echo Client
  50 * @{
  51 */
  52
  53struct echo_device {
  54        struct cl_device        ed_cl;
  55        struct echo_client_obd *ed_ec;
  56
  57        struct cl_site    ed_site_myself;
  58        struct lu_site          *ed_site;
  59        struct lu_device       *ed_next;
  60};
  61
  62struct echo_object {
  63        struct cl_object        eo_cl;
  64        struct cl_object_header eo_hdr;
  65
  66        struct echo_device     *eo_dev;
  67        struct list_head              eo_obj_chain;
  68        struct lov_oinfo       *eo_oinfo;
  69        atomic_t            eo_npages;
  70        int                  eo_deleted;
  71};
  72
  73struct echo_object_conf {
  74        struct cl_object_conf  eoc_cl;
  75        struct lov_oinfo      **eoc_oinfo;
  76};
  77
  78struct echo_page {
  79        struct cl_page_slice   ep_cl;
  80        struct mutex            ep_lock;
  81};
  82
  83struct echo_lock {
  84        struct cl_lock_slice   el_cl;
  85        struct list_head             el_chain;
  86        struct echo_object    *el_object;
  87        __u64             el_cookie;
  88        atomic_t           el_refcount;
  89};
  90
  91static int echo_client_setup(const struct lu_env *env,
  92                             struct obd_device *obddev,
  93                             struct lustre_cfg *lcfg);
  94static int echo_client_cleanup(struct obd_device *obddev);
  95
  96/** \defgroup echo_helpers Helper functions
  97 * @{
  98 */
  99static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 100{
 101        return container_of0(dev, struct echo_device, ed_cl);
 102}
 103
 104static inline struct cl_device *echo_dev2cl(struct echo_device *d)
 105{
 106        return &d->ed_cl;
 107}
 108
 109static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
 110{
 111        return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
 112}
 113
 114static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
 115{
 116        return &eco->eo_cl;
 117}
 118
 119static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
 120{
 121        return container_of(o, struct echo_object, eo_cl);
 122}
 123
 124static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
 125{
 126        return container_of(s, struct echo_page, ep_cl);
 127}
 128
 129static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
 130{
 131        return container_of(s, struct echo_lock, el_cl);
 132}
 133
 134static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
 135{
 136        return ecl->el_cl.cls_lock;
 137}
 138
 139static struct lu_context_key echo_thread_key;
 140static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
 141{
 142        struct echo_thread_info *info;
 143
 144        info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
 145        LASSERT(info);
 146        return info;
 147}
 148
 149static inline
 150struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 151{
 152        return container_of(c, struct echo_object_conf, eoc_cl);
 153}
 154
 155/** @} echo_helpers */
 156static int cl_echo_object_put(struct echo_object *eco);
 157static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
 158                              struct page **pages, int npages, int async);
 159
 160struct echo_thread_info {
 161        struct echo_object_conf eti_conf;
 162        struct lustre_md        eti_md;
 163
 164        struct cl_2queue        eti_queue;
 165        struct cl_io        eti_io;
 166        struct cl_lock          eti_lock;
 167        struct lu_fid      eti_fid;
 168        struct lu_fid           eti_fid2;
 169};
 170
 171/* No session used right now */
 172struct echo_session_info {
 173        unsigned long dummy;
 174};
 175
 176static struct kmem_cache *echo_lock_kmem;
 177static struct kmem_cache *echo_object_kmem;
 178static struct kmem_cache *echo_thread_kmem;
 179static struct kmem_cache *echo_session_kmem;
 180
 181static struct lu_kmem_descr echo_caches[] = {
 182        {
 183                .ckd_cache = &echo_lock_kmem,
 184                .ckd_name  = "echo_lock_kmem",
 185                .ckd_size  = sizeof(struct echo_lock)
 186        },
 187        {
 188                .ckd_cache = &echo_object_kmem,
 189                .ckd_name  = "echo_object_kmem",
 190                .ckd_size  = sizeof(struct echo_object)
 191        },
 192        {
 193                .ckd_cache = &echo_thread_kmem,
 194                .ckd_name  = "echo_thread_kmem",
 195                .ckd_size  = sizeof(struct echo_thread_info)
 196        },
 197        {
 198                .ckd_cache = &echo_session_kmem,
 199                .ckd_name  = "echo_session_kmem",
 200                .ckd_size  = sizeof(struct echo_session_info)
 201        },
 202        {
 203                .ckd_cache = NULL
 204        }
 205};
 206
 207/** \defgroup echo_page Page operations
 208 *
 209 * Echo page operations.
 210 *
 211 * @{
 212 */
 213static int echo_page_own(const struct lu_env *env,
 214                         const struct cl_page_slice *slice,
 215                         struct cl_io *io, int nonblock)
 216{
 217        struct echo_page *ep = cl2echo_page(slice);
 218
 219        if (!nonblock)
 220                mutex_lock(&ep->ep_lock);
 221        else if (!mutex_trylock(&ep->ep_lock))
 222                return -EAGAIN;
 223        return 0;
 224}
 225
 226static void echo_page_disown(const struct lu_env *env,
 227                             const struct cl_page_slice *slice,
 228                             struct cl_io *io)
 229{
 230        struct echo_page *ep = cl2echo_page(slice);
 231
 232        LASSERT(mutex_is_locked(&ep->ep_lock));
 233        mutex_unlock(&ep->ep_lock);
 234}
 235
 236static void echo_page_discard(const struct lu_env *env,
 237                              const struct cl_page_slice *slice,
 238                              struct cl_io *unused)
 239{
 240        cl_page_delete(env, slice->cpl_page);
 241}
 242
 243static int echo_page_is_vmlocked(const struct lu_env *env,
 244                                 const struct cl_page_slice *slice)
 245{
 246        if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
 247                return -EBUSY;
 248        return -ENODATA;
 249}
 250
 251static void echo_page_completion(const struct lu_env *env,
 252                                 const struct cl_page_slice *slice,
 253                                 int ioret)
 254{
 255        LASSERT(slice->cpl_page->cp_sync_io);
 256}
 257
 258static void echo_page_fini(const struct lu_env *env,
 259                           struct cl_page_slice *slice)
 260{
 261        struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
 262
 263        atomic_dec(&eco->eo_npages);
 264        put_page(slice->cpl_page->cp_vmpage);
 265}
 266
 267static int echo_page_prep(const struct lu_env *env,
 268                          const struct cl_page_slice *slice,
 269                          struct cl_io *unused)
 270{
 271        return 0;
 272}
 273
 274static int echo_page_print(const struct lu_env *env,
 275                           const struct cl_page_slice *slice,
 276                           void *cookie, lu_printer_t printer)
 277{
 278        struct echo_page *ep = cl2echo_page(slice);
 279
 280        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME "-page@%p %d vm@%p\n",
 281                   ep, mutex_is_locked(&ep->ep_lock),
 282                   slice->cpl_page->cp_vmpage);
 283        return 0;
 284}
 285
 286static const struct cl_page_operations echo_page_ops = {
 287        .cpo_own           = echo_page_own,
 288        .cpo_disown     = echo_page_disown,
 289        .cpo_discard       = echo_page_discard,
 290        .cpo_fini         = echo_page_fini,
 291        .cpo_print       = echo_page_print,
 292        .cpo_is_vmlocked   = echo_page_is_vmlocked,
 293        .io = {
 294                [CRT_READ] = {
 295                        .cpo_prep       = echo_page_prep,
 296                        .cpo_completion  = echo_page_completion,
 297                },
 298                [CRT_WRITE] = {
 299                        .cpo_prep       = echo_page_prep,
 300                        .cpo_completion  = echo_page_completion,
 301                }
 302        }
 303};
 304
 305/** @} echo_page */
 306
 307/** \defgroup echo_lock Locking
 308 *
 309 * echo lock operations
 310 *
 311 * @{
 312 */
 313static void echo_lock_fini(const struct lu_env *env,
 314                           struct cl_lock_slice *slice)
 315{
 316        struct echo_lock *ecl = cl2echo_lock(slice);
 317
 318        LASSERT(list_empty(&ecl->el_chain));
 319        kmem_cache_free(echo_lock_kmem, ecl);
 320}
 321
 322static struct cl_lock_operations echo_lock_ops = {
 323        .clo_fini      = echo_lock_fini,
 324};
 325
 326/** @} echo_lock */
 327
 328/** \defgroup echo_cl_ops cl_object operations
 329 *
 330 * operations for cl_object
 331 *
 332 * @{
 333 */
 334static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 335                          struct cl_page *page, pgoff_t index)
 336{
 337        struct echo_page *ep = cl_object_page_slice(obj, page);
 338        struct echo_object *eco = cl2echo_obj(obj);
 339
 340        get_page(page->cp_vmpage);
 341        mutex_init(&ep->ep_lock);
 342        cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
 343        atomic_inc(&eco->eo_npages);
 344        return 0;
 345}
 346
 347static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
 348                        struct cl_io *io)
 349{
 350        return 0;
 351}
 352
 353static int echo_lock_init(const struct lu_env *env,
 354                          struct cl_object *obj, struct cl_lock *lock,
 355                          const struct cl_io *unused)
 356{
 357        struct echo_lock *el;
 358
 359        el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
 360        if (el) {
 361                cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
 362                el->el_object = cl2echo_obj(obj);
 363                INIT_LIST_HEAD(&el->el_chain);
 364                atomic_set(&el->el_refcount, 0);
 365        }
 366        return !el ? -ENOMEM : 0;
 367}
 368
 369static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
 370                         const struct cl_object_conf *conf)
 371{
 372        return 0;
 373}
 374
 375static const struct cl_object_operations echo_cl_obj_ops = {
 376        .coo_page_init = echo_page_init,
 377        .coo_lock_init = echo_lock_init,
 378        .coo_io_init   = echo_io_init,
 379        .coo_conf_set  = echo_conf_set
 380};
 381
 382/** @} echo_cl_ops */
 383
 384/** \defgroup echo_lu_ops lu_object operations
 385 *
 386 * operations for echo lu object.
 387 *
 388 * @{
 389 */
 390static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
 391                            const struct lu_object_conf *conf)
 392{
 393        struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
 394        struct echo_client_obd *ec     = ed->ed_ec;
 395        struct echo_object *eco = cl2echo_obj(lu2cl(obj));
 396        const struct cl_object_conf *cconf;
 397        struct echo_object_conf *econf;
 398
 399        if (ed->ed_next) {
 400                struct lu_object  *below;
 401                struct lu_device  *under;
 402
 403                under = ed->ed_next;
 404                below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
 405                                                        under);
 406                if (!below)
 407                        return -ENOMEM;
 408                lu_object_add(obj, below);
 409        }
 410
 411        cconf = lu2cl_conf(conf);
 412        econf = cl2echo_conf(cconf);
 413
 414        LASSERT(econf->eoc_oinfo);
 415        /*
 416         * Transfer the oinfo pointer to eco that it won't be
 417         * freed.
 418         */
 419        eco->eo_oinfo = *econf->eoc_oinfo;
 420        *econf->eoc_oinfo = NULL;
 421
 422        eco->eo_dev = ed;
 423        atomic_set(&eco->eo_npages, 0);
 424        cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
 425
 426        spin_lock(&ec->ec_lock);
 427        list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
 428        spin_unlock(&ec->ec_lock);
 429
 430        return 0;
 431}
 432
 433static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 434{
 435        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
 436        struct echo_client_obd *ec = eco->eo_dev->ed_ec;
 437
 438        LASSERT(atomic_read(&eco->eo_npages) == 0);
 439
 440        spin_lock(&ec->ec_lock);
 441        list_del_init(&eco->eo_obj_chain);
 442        spin_unlock(&ec->ec_lock);
 443
 444        lu_object_fini(obj);
 445        lu_object_header_fini(obj->lo_header);
 446
 447        kfree(eco->eo_oinfo);
 448        kmem_cache_free(echo_object_kmem, eco);
 449}
 450
 451static int echo_object_print(const struct lu_env *env, void *cookie,
 452                             lu_printer_t p, const struct lu_object *o)
 453{
 454        struct echo_object *obj = cl2echo_obj(lu2cl(o));
 455
 456        return (*p)(env, cookie, "echoclient-object@%p", obj);
 457}
 458
 459static const struct lu_object_operations echo_lu_obj_ops = {
 460        .loo_object_init      = echo_object_init,
 461        .loo_object_delete    = NULL,
 462        .loo_object_release   = NULL,
 463        .loo_object_free      = echo_object_free,
 464        .loo_object_print     = echo_object_print,
 465        .loo_object_invariant = NULL
 466};
 467
 468/** @} echo_lu_ops */
 469
 470/** \defgroup echo_lu_dev_ops  lu_device operations
 471 *
 472 * Operations for echo lu device.
 473 *
 474 * @{
 475 */
 476static struct lu_object *echo_object_alloc(const struct lu_env *env,
 477                                           const struct lu_object_header *hdr,
 478                                           struct lu_device *dev)
 479{
 480        struct echo_object *eco;
 481        struct lu_object *obj = NULL;
 482
 483        /* we're the top dev. */
 484        LASSERT(!hdr);
 485        eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
 486        if (eco) {
 487                struct cl_object_header *hdr = &eco->eo_hdr;
 488
 489                obj = &echo_obj2cl(eco)->co_lu;
 490                cl_object_header_init(hdr);
 491                hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
 492
 493                lu_object_init(obj, &hdr->coh_lu, dev);
 494                lu_object_add_top(&hdr->coh_lu, obj);
 495
 496                eco->eo_cl.co_ops = &echo_cl_obj_ops;
 497                obj->lo_ops       = &echo_lu_obj_ops;
 498        }
 499        return obj;
 500}
 501
 502static const struct lu_device_operations echo_device_lu_ops = {
 503        .ldo_object_alloc   = echo_object_alloc,
 504};
 505
 506/** @} echo_lu_dev_ops */
 507
 508/** \defgroup echo_init Setup and teardown
 509 *
 510 * Init and fini functions for echo client.
 511 *
 512 * @{
 513 */
 514static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 515{
 516        struct cl_site *site = &ed->ed_site_myself;
 517        int rc;
 518
 519        /* initialize site */
 520        rc = cl_site_init(site, &ed->ed_cl);
 521        if (rc) {
 522                CERROR("Cannot initialize site for echo client(%d)\n", rc);
 523                return rc;
 524        }
 525
 526        rc = lu_site_init_finish(&site->cs_lu);
 527        if (rc) {
 528                cl_site_fini(site);
 529                return rc;
 530        }
 531
 532        ed->ed_site = &site->cs_lu;
 533        return 0;
 534}
 535
 536static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
 537{
 538        if (ed->ed_site) {
 539                lu_site_fini(ed->ed_site);
 540                ed->ed_site = NULL;
 541        }
 542}
 543
 544static void *echo_thread_key_init(const struct lu_context *ctx,
 545                                  struct lu_context_key *key)
 546{
 547        struct echo_thread_info *info;
 548
 549        info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
 550        if (!info)
 551                info = ERR_PTR(-ENOMEM);
 552        return info;
 553}
 554
 555static void echo_thread_key_fini(const struct lu_context *ctx,
 556                                 struct lu_context_key *key, void *data)
 557{
 558        struct echo_thread_info *info = data;
 559
 560        kmem_cache_free(echo_thread_kmem, info);
 561}
 562
 563static struct lu_context_key echo_thread_key = {
 564        .lct_tags = LCT_CL_THREAD,
 565        .lct_init = echo_thread_key_init,
 566        .lct_fini = echo_thread_key_fini,
 567};
 568
 569static void *echo_session_key_init(const struct lu_context *ctx,
 570                                   struct lu_context_key *key)
 571{
 572        struct echo_session_info *session;
 573
 574        session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
 575        if (!session)
 576                session = ERR_PTR(-ENOMEM);
 577        return session;
 578}
 579
 580static void echo_session_key_fini(const struct lu_context *ctx,
 581                                  struct lu_context_key *key, void *data)
 582{
 583        struct echo_session_info *session = data;
 584
 585        kmem_cache_free(echo_session_kmem, session);
 586}
 587
 588static struct lu_context_key echo_session_key = {
 589        .lct_tags = LCT_SESSION,
 590        .lct_init = echo_session_key_init,
 591        .lct_fini = echo_session_key_fini,
 592};
 593
 594LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 595
 596static struct lu_device *echo_device_alloc(const struct lu_env *env,
 597                                           struct lu_device_type *t,
 598                                           struct lustre_cfg *cfg)
 599{
 600        struct lu_device   *next;
 601        struct echo_device *ed;
 602        struct cl_device   *cd;
 603        struct obd_device  *obd = NULL; /* to keep compiler happy */
 604        struct obd_device  *tgt;
 605        const char *tgt_type_name;
 606        int rc, err;
 607
 608        ed = kzalloc(sizeof(*ed), GFP_NOFS);
 609        if (!ed) {
 610                rc = -ENOMEM;
 611                goto out;
 612        }
 613
 614        cd = &ed->ed_cl;
 615        rc = cl_device_init(cd, t);
 616        if (rc)
 617                goto out_free;
 618
 619        cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
 620
 621        obd = class_name2obd(lustre_cfg_string(cfg, 0));
 622        LASSERT(obd);
 623        LASSERT(env);
 624
 625        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
 626        if (!tgt) {
 627                CERROR("Can not find tgt device %s\n",
 628                       lustre_cfg_string(cfg, 1));
 629                rc = -ENODEV;
 630                goto out_device_fini;
 631        }
 632
 633        next = tgt->obd_lu_dev;
 634        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 635                CERROR("echo MDT client must be run on server\n");
 636                rc = -EOPNOTSUPP;
 637                goto out_device_fini;
 638        }
 639
 640        rc = echo_site_init(env, ed);
 641        if (rc)
 642                goto out_device_fini;
 643
 644        rc = echo_client_setup(env, obd, cfg);
 645        if (rc)
 646                goto out_site_fini;
 647
 648        ed->ed_ec = &obd->u.echo_client;
 649
 650        /* if echo client is to be stacked upon ost device, the next is
 651         * NULL since ost is not a clio device so far
 652         */
 653        if (next && !lu_device_is_cl(next))
 654                next = NULL;
 655
 656        tgt_type_name = tgt->obd_type->typ_name;
 657        if (next) {
 658                if (next->ld_site) {
 659                        rc = -EBUSY;
 660                        goto out_cleanup;
 661                }
 662
 663                next->ld_site = ed->ed_site;
 664                rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
 665                                                next->ld_type->ldt_name,
 666                                                              NULL);
 667                if (rc)
 668                        goto out_cleanup;
 669
 670        } else {
 671                LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
 672        }
 673
 674        ed->ed_next = next;
 675        return &cd->cd_lu_dev;
 676
 677out_cleanup:
 678        err = echo_client_cleanup(obd);
 679        if (err)
 680                CERROR("Cleanup obd device %s error(%d)\n",
 681                       obd->obd_name, err);
 682out_site_fini:
 683        echo_site_fini(env, ed);
 684out_device_fini:
 685        cl_device_fini(&ed->ed_cl);
 686out_free:
 687        kfree(ed);
 688out:
 689        return ERR_PTR(rc);
 690}
 691
 692static int echo_device_init(const struct lu_env *env, struct lu_device *d,
 693                            const char *name, struct lu_device *next)
 694{
 695        LBUG();
 696        return 0;
 697}
 698
 699static struct lu_device *echo_device_fini(const struct lu_env *env,
 700                                          struct lu_device *d)
 701{
 702        struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
 703        struct lu_device *next = ed->ed_next;
 704
 705        while (next)
 706                next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
 707        return NULL;
 708}
 709
 710static void echo_lock_release(const struct lu_env *env,
 711                              struct echo_lock *ecl,
 712                              int still_used)
 713{
 714        struct cl_lock *clk = echo_lock2cl(ecl);
 715
 716        cl_lock_release(env, clk);
 717}
 718
 719static struct lu_device *echo_device_free(const struct lu_env *env,
 720                                          struct lu_device *d)
 721{
 722        struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
 723        struct echo_client_obd *ec   = ed->ed_ec;
 724        struct echo_object     *eco;
 725        struct lu_device       *next = ed->ed_next;
 726
 727        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
 728               ed, next);
 729
 730        lu_site_purge(env, ed->ed_site, -1);
 731
 732        /* check if there are objects still alive.
 733         * It shouldn't have any object because lu_site_purge would cleanup
 734         * all of cached objects. Anyway, probably the echo device is being
 735         * parallelly accessed.
 736         */
 737        spin_lock(&ec->ec_lock);
 738        list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
 739                eco->eo_deleted = 1;
 740        spin_unlock(&ec->ec_lock);
 741
 742        /* purge again */
 743        lu_site_purge(env, ed->ed_site, -1);
 744
 745        CDEBUG(D_INFO,
 746               "Waiting for the reference of echo object to be dropped\n");
 747
 748        /* Wait for the last reference to be dropped. */
 749        spin_lock(&ec->ec_lock);
 750        while (!list_empty(&ec->ec_objects)) {
 751                spin_unlock(&ec->ec_lock);
 752                CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
 753                set_current_state(TASK_UNINTERRUPTIBLE);
 754                schedule_timeout(cfs_time_seconds(1));
 755                lu_site_purge(env, ed->ed_site, -1);
 756                spin_lock(&ec->ec_lock);
 757        }
 758        spin_unlock(&ec->ec_lock);
 759
 760        LASSERT(list_empty(&ec->ec_locks));
 761
 762        CDEBUG(D_INFO, "No object exists, exiting...\n");
 763
 764        echo_client_cleanup(d->ld_obd);
 765
 766        while (next)
 767                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 768
 769        LASSERT(ed->ed_site == d->ld_site);
 770        echo_site_fini(env, ed);
 771        cl_device_fini(&ed->ed_cl);
 772        kfree(ed);
 773
 774        cl_env_cache_purge(~0);
 775
 776        return NULL;
 777}
 778
 779static const struct lu_device_type_operations echo_device_type_ops = {
 780        .ldto_init = echo_type_init,
 781        .ldto_fini = echo_type_fini,
 782
 783        .ldto_start = echo_type_start,
 784        .ldto_stop  = echo_type_stop,
 785
 786        .ldto_device_alloc = echo_device_alloc,
 787        .ldto_device_free  = echo_device_free,
 788        .ldto_device_init  = echo_device_init,
 789        .ldto_device_fini  = echo_device_fini
 790};
 791
 792static struct lu_device_type echo_device_type = {
 793        .ldt_tags     = LU_DEVICE_CL,
 794        .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
 795        .ldt_ops      = &echo_device_type_ops,
 796        .ldt_ctx_tags = LCT_CL_THREAD,
 797};
 798
 799/** @} echo_init */
 800
 801/** \defgroup echo_exports Exported operations
 802 *
 803 * exporting functions to echo client
 804 *
 805 * @{
 806 */
 807
 808/* Interfaces to echo client obd device */
 809static struct echo_object *
 810cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
 811{
 812        struct lu_env *env;
 813        struct echo_thread_info *info;
 814        struct echo_object_conf *conf;
 815        struct lov_oinfo *oinfo = NULL;
 816        struct echo_object *eco;
 817        struct cl_object   *obj;
 818        struct lu_fid *fid;
 819        u16 refcheck;
 820        int rc;
 821
 822        LASSERTF(ostid_id(oi), DOSTID "\n", POSTID(oi));
 823        LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID "\n", POSTID(oi));
 824
 825        /* Never return an object if the obd is to be freed. */
 826        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
 827                return ERR_PTR(-ENODEV);
 828
 829        env = cl_env_get(&refcheck);
 830        if (IS_ERR(env))
 831                return (void *)env;
 832
 833        info = echo_env_info(env);
 834        conf = &info->eti_conf;
 835        if (d->ed_next) {
 836                oinfo = kzalloc(sizeof(*oinfo), GFP_NOFS);
 837                if (!oinfo) {
 838                        eco = ERR_PTR(-ENOMEM);
 839                        goto out;
 840                }
 841
 842                oinfo->loi_oi = *oi;
 843                conf->eoc_cl.u.coc_oinfo = oinfo;
 844        }
 845
 846        /*
 847         * If echo_object_init() is successful then ownership of oinfo
 848         * is transferred to the object.
 849         */
 850        conf->eoc_oinfo = &oinfo;
 851
 852        fid  = &info->eti_fid;
 853        rc = ostid_to_fid(fid, (struct ost_id *)oi, 0);
 854        if (rc != 0) {
 855                eco = ERR_PTR(rc);
 856                goto out;
 857        }
 858
 859        /* In the function below, .hs_keycmp resolves to
 860         * lu_obj_hop_keycmp()
 861         */
 862        /* coverity[overrun-buffer-val] */
 863        obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
 864        if (IS_ERR(obj)) {
 865                eco = (void *)obj;
 866                goto out;
 867        }
 868
 869        eco = cl2echo_obj(obj);
 870        if (eco->eo_deleted) {
 871                cl_object_put(env, obj);
 872                eco = ERR_PTR(-EAGAIN);
 873        }
 874
 875out:
 876        kfree(oinfo);
 877        cl_env_put(env, &refcheck);
 878        return eco;
 879}
 880
 881static int cl_echo_object_put(struct echo_object *eco)
 882{
 883        struct lu_env *env;
 884        struct cl_object *obj = echo_obj2cl(eco);
 885        u16 refcheck;
 886
 887        env = cl_env_get(&refcheck);
 888        if (IS_ERR(env))
 889                return PTR_ERR(env);
 890
 891        /* an external function to kill an object? */
 892        if (eco->eo_deleted) {
 893                struct lu_object_header *loh = obj->co_lu.lo_header;
 894
 895                LASSERT(&eco->eo_hdr == luh2coh(loh));
 896                set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
 897        }
 898
 899        cl_object_put(env, obj);
 900        cl_env_put(env, &refcheck);
 901        return 0;
 902}
 903
 904static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
 905                            u64 start, u64 end, int mode,
 906                            __u64 *cookie, __u32 enqflags)
 907{
 908        struct cl_io *io;
 909        struct cl_lock *lck;
 910        struct cl_object *obj;
 911        struct cl_lock_descr *descr;
 912        struct echo_thread_info *info;
 913        int rc = -ENOMEM;
 914
 915        info = echo_env_info(env);
 916        io = &info->eti_io;
 917        lck = &info->eti_lock;
 918        obj = echo_obj2cl(eco);
 919
 920        memset(lck, 0, sizeof(*lck));
 921        descr = &lck->cll_descr;
 922        descr->cld_obj   = obj;
 923        descr->cld_start = cl_index(obj, start);
 924        descr->cld_end   = cl_index(obj, end);
 925        descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
 926        descr->cld_enq_flags = enqflags;
 927        io->ci_obj = obj;
 928
 929        rc = cl_lock_request(env, io, lck);
 930        if (rc == 0) {
 931                struct echo_client_obd *ec = eco->eo_dev->ed_ec;
 932                struct echo_lock *el;
 933
 934                el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
 935                spin_lock(&ec->ec_lock);
 936                if (list_empty(&el->el_chain)) {
 937                        list_add(&el->el_chain, &ec->ec_locks);
 938                        el->el_cookie = ++ec->ec_unique;
 939                }
 940                atomic_inc(&el->el_refcount);
 941                *cookie = el->el_cookie;
 942                spin_unlock(&ec->ec_lock);
 943        }
 944        return rc;
 945}
 946
 947static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
 948                           __u64 cookie)
 949{
 950        struct echo_client_obd *ec = ed->ed_ec;
 951        struct echo_lock       *ecl = NULL;
 952        struct list_head             *el;
 953        int found = 0, still_used = 0;
 954
 955        spin_lock(&ec->ec_lock);
 956        list_for_each(el, &ec->ec_locks) {
 957                ecl = list_entry(el, struct echo_lock, el_chain);
 958                CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
 959                found = (ecl->el_cookie == cookie);
 960                if (found) {
 961                        if (atomic_dec_and_test(&ecl->el_refcount))
 962                                list_del_init(&ecl->el_chain);
 963                        else
 964                                still_used = 1;
 965                        break;
 966                }
 967        }
 968        spin_unlock(&ec->ec_lock);
 969
 970        if (!found)
 971                return -ENOENT;
 972
 973        echo_lock_release(env, ecl, still_used);
 974        return 0;
 975}
 976
 977static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
 978                                 struct cl_page *page)
 979{
 980        struct echo_thread_info *info;
 981        struct cl_2queue *queue;
 982
 983        info = echo_env_info(env);
 984        LASSERT(io == &info->eti_io);
 985
 986        queue = &info->eti_queue;
 987        cl_page_list_add(&queue->c2_qout, page);
 988}
 989
 990static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
 991                              struct page **pages, int npages, int async)
 992{
 993        struct lu_env      *env;
 994        struct echo_thread_info *info;
 995        struct cl_object        *obj = echo_obj2cl(eco);
 996        struct echo_device      *ed  = eco->eo_dev;
 997        struct cl_2queue        *queue;
 998        struct cl_io        *io;
 999        struct cl_page    *clp;
1000        struct lustre_handle    lh = { 0 };
1001        size_t page_size = cl_page_size(obj);
1002        u16 refcheck;
1003        int rc;
1004        int i;
1005
1006        LASSERT((offset & ~PAGE_MASK) == 0);
1007        LASSERT(ed->ed_next);
1008        env = cl_env_get(&refcheck);
1009        if (IS_ERR(env))
1010                return PTR_ERR(env);
1011
1012        info    = echo_env_info(env);
1013        io      = &info->eti_io;
1014        queue   = &info->eti_queue;
1015
1016        cl_2queue_init(queue);
1017
1018        io->ci_ignore_layout = 1;
1019        rc = cl_io_init(env, io, CIT_MISC, obj);
1020        if (rc < 0)
1021                goto out;
1022        LASSERT(rc == 0);
1023
1024        rc = cl_echo_enqueue0(env, eco, offset,
1025                              offset + npages * PAGE_SIZE - 1,
1026                              rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1027                              CEF_NEVER);
1028        if (rc < 0)
1029                goto error_lock;
1030
1031        for (i = 0; i < npages; i++) {
1032                LASSERT(pages[i]);
1033                clp = cl_page_find(env, obj, cl_index(obj, offset),
1034                                   pages[i], CPT_TRANSIENT);
1035                if (IS_ERR(clp)) {
1036                        rc = PTR_ERR(clp);
1037                        break;
1038                }
1039                LASSERT(clp->cp_type == CPT_TRANSIENT);
1040
1041                rc = cl_page_own(env, io, clp);
1042                if (rc) {
1043                        LASSERT(clp->cp_state == CPS_FREEING);
1044                        cl_page_put(env, clp);
1045                        break;
1046                }
1047                /*
1048                 * Add a page to the incoming page list of 2-queue.
1049                 */
1050                cl_page_list_add(&queue->c2_qin, clp);
1051
1052                /* drop the reference count for cl_page_find, so that the page
1053                 * will be freed in cl_2queue_fini.
1054                 */
1055                cl_page_put(env, clp);
1056                cl_page_clip(env, clp, 0, page_size);
1057
1058                offset += page_size;
1059        }
1060
1061        if (rc == 0) {
1062                enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1063
1064                async = async && (typ == CRT_WRITE);
1065                if (async)
1066                        rc = cl_io_commit_async(env, io, &queue->c2_qin,
1067                                                0, PAGE_SIZE,
1068                                                echo_commit_callback);
1069                else
1070                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
1071                CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1072                       async ? "async" : "sync", rc);
1073        }
1074
1075        cl_echo_cancel0(env, ed, lh.cookie);
1076error_lock:
1077        cl_2queue_discard(env, io, queue);
1078        cl_2queue_disown(env, io, queue);
1079        cl_2queue_fini(env, queue);
1080        cl_io_fini(env, io);
1081out:
1082        cl_env_put(env, &refcheck);
1083        return rc;
1084}
1085
1086/** @} echo_exports */
1087
1088static u64 last_object_id;
1089
1090static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1091                              struct obdo *oa)
1092{
1093        struct echo_object     *eco;
1094        struct echo_client_obd *ec = ed->ed_ec;
1095        int                  rc;
1096        int                  created = 0;
1097
1098        if (!(oa->o_valid & OBD_MD_FLID) ||
1099            !(oa->o_valid & OBD_MD_FLGROUP) ||
1100            !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1101                CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1102                return -EINVAL;
1103        }
1104
1105        if (!ostid_id(&oa->o_oi))
1106                ostid_set_id(&oa->o_oi, ++last_object_id);
1107
1108        rc = obd_create(env, ec->ec_exp, oa);
1109        if (rc != 0) {
1110                CERROR("Cannot create objects: rc = %d\n", rc);
1111                goto failed;
1112        }
1113        created = 1;
1114
1115        oa->o_valid |= OBD_MD_FLID;
1116
1117        eco = cl_echo_object_find(ed, &oa->o_oi);
1118        if (IS_ERR(eco)) {
1119                rc = PTR_ERR(eco);
1120                goto failed;
1121        }
1122        cl_echo_object_put(eco);
1123
1124        CDEBUG(D_INFO, "oa oid " DOSTID "\n", POSTID(&oa->o_oi));
1125
1126 failed:
1127        if (created && rc)
1128                obd_destroy(env, ec->ec_exp, oa);
1129        if (rc)
1130                CERROR("create object failed with: rc = %d\n", rc);
1131        return rc;
1132}
1133
1134static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1135                           struct obdo *oa)
1136{
1137        struct echo_object     *eco;
1138        int                  rc;
1139
1140        if (!(oa->o_valid & OBD_MD_FLID) || !(oa->o_valid & OBD_MD_FLGROUP) ||
1141            !ostid_id(&oa->o_oi)) {
1142                CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1143                return -EINVAL;
1144        }
1145
1146        rc = 0;
1147        eco = cl_echo_object_find(ed, &oa->o_oi);
1148        if (!IS_ERR(eco))
1149                *ecop = eco;
1150        else
1151                rc = PTR_ERR(eco);
1152        return rc;
1153}
1154
1155static void echo_put_object(struct echo_object *eco)
1156{
1157        int rc;
1158
1159        rc = cl_echo_object_put(eco);
1160        if (rc)
1161                CERROR("%s: echo client drop an object failed: rc = %d\n",
1162                       eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
1163}
1164
1165static void
1166echo_client_page_debug_setup(struct page *page, int rw, u64 id,
1167                             u64 offset, u64 count)
1168{
1169        char    *addr;
1170        u64      stripe_off;
1171        u64      stripe_id;
1172        int      delta;
1173
1174        /* no partial pages on the client */
1175        LASSERT(count == PAGE_SIZE);
1176
1177        addr = kmap(page);
1178
1179        for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1180                if (rw == OBD_BRW_WRITE) {
1181                        stripe_off = offset + delta;
1182                        stripe_id = id;
1183                } else {
1184                        stripe_off = 0xdeadbeef00c0ffeeULL;
1185                        stripe_id = 0xdeadbeef00c0ffeeULL;
1186                }
1187                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1188                                  stripe_off, stripe_id);
1189        }
1190
1191        kunmap(page);
1192}
1193
1194static int echo_client_page_debug_check(struct page *page, u64 id,
1195                                        u64 offset, u64 count)
1196{
1197        u64     stripe_off;
1198        u64     stripe_id;
1199        char   *addr;
1200        int     delta;
1201        int     rc;
1202        int     rc2;
1203
1204        /* no partial pages on the client */
1205        LASSERT(count == PAGE_SIZE);
1206
1207        addr = kmap(page);
1208
1209        for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1210                stripe_off = offset + delta;
1211                stripe_id = id;
1212
1213                rc2 = block_debug_check("test_brw",
1214                                        addr + delta, OBD_ECHO_BLOCK_SIZE,
1215                                        stripe_off, stripe_id);
1216                if (rc2 != 0) {
1217                        CERROR("Error in echo object %#llx\n", id);
1218                        rc = rc2;
1219                }
1220        }
1221
1222        kunmap(page);
1223        return rc;
1224}
1225
1226static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1227                            struct echo_object *eco, u64 offset,
1228                            u64 count, int async)
1229{
1230        u32            npages;
1231        struct brw_page *pga;
1232        struct brw_page *pgp;
1233        struct page         **pages;
1234        u64              off;
1235        int                  i;
1236        int                  rc;
1237        int                  verify;
1238        gfp_t                gfp_mask;
1239        int                  brw_flags = 0;
1240
1241        verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1242                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1243                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1244
1245        gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1246
1247        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1248
1249        if (count <= 0 ||
1250            (count & (~PAGE_MASK)) != 0)
1251                return -EINVAL;
1252
1253        /* XXX think again with misaligned I/O */
1254        npages = count >> PAGE_SHIFT;
1255
1256        if (rw == OBD_BRW_WRITE)
1257                brw_flags = OBD_BRW_ASYNC;
1258
1259        pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1260        if (!pga)
1261                return -ENOMEM;
1262
1263        pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1264        if (!pages) {
1265                kfree(pga);
1266                return -ENOMEM;
1267        }
1268
1269        for (i = 0, pgp = pga, off = offset;
1270             i < npages;
1271             i++, pgp++, off += PAGE_SIZE) {
1272                LASSERT(!pgp->pg);      /* for cleanup */
1273
1274                rc = -ENOMEM;
1275                pgp->pg = alloc_page(gfp_mask);
1276                if (!pgp->pg)
1277                        goto out;
1278
1279                pages[i] = pgp->pg;
1280                pgp->count = PAGE_SIZE;
1281                pgp->off = off;
1282                pgp->flag = brw_flags;
1283
1284                if (verify)
1285                        echo_client_page_debug_setup(pgp->pg, rw,
1286                                                     ostid_id(&oa->o_oi), off,
1287                                                     pgp->count);
1288        }
1289
1290        /* brw mode can only be used at client */
1291        LASSERT(ed->ed_next);
1292        rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1293
1294 out:
1295        if (rc != 0 || rw != OBD_BRW_READ)
1296                verify = 0;
1297
1298        for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1299                if (!pgp->pg)
1300                        continue;
1301
1302                if (verify) {
1303                        int vrc;
1304
1305                        vrc = echo_client_page_debug_check(pgp->pg,
1306                                                           ostid_id(&oa->o_oi),
1307                                                           pgp->off, pgp->count);
1308                        if (vrc != 0 && rc == 0)
1309                                rc = vrc;
1310                }
1311                __free_page(pgp->pg);
1312        }
1313        kfree(pga);
1314        kfree(pages);
1315        return rc;
1316}
1317
1318static int echo_client_prep_commit(const struct lu_env *env,
1319                                   struct obd_export *exp, int rw,
1320                                   struct obdo *oa, struct echo_object *eco,
1321                                   u64 offset, u64 count,
1322                                   u64 batch, int async)
1323{
1324        struct obd_ioobj ioo;
1325        struct niobuf_local *lnb;
1326        struct niobuf_remote rnb;
1327        u64 off;
1328        u64 npages, tot_pages;
1329        int i, ret = 0, brw_flags = 0;
1330
1331        if (count <= 0 || (count & (~PAGE_MASK)) != 0)
1332                return -EINVAL;
1333
1334        npages = batch >> PAGE_SHIFT;
1335        tot_pages = count >> PAGE_SHIFT;
1336
1337        lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1338        if (!lnb) {
1339                ret = -ENOMEM;
1340                goto out;
1341        }
1342
1343        if (rw == OBD_BRW_WRITE && async)
1344                brw_flags |= OBD_BRW_ASYNC;
1345
1346        obdo_to_ioobj(oa, &ioo);
1347
1348        off = offset;
1349
1350        for (; tot_pages > 0; tot_pages -= npages) {
1351                int lpages;
1352
1353                if (tot_pages < npages)
1354                        npages = tot_pages;
1355
1356                rnb.rnb_offset = off;
1357                rnb.rnb_len = npages * PAGE_SIZE;
1358                rnb.rnb_flags = brw_flags;
1359                ioo.ioo_bufcnt = 1;
1360                off += npages * PAGE_SIZE;
1361
1362                lpages = npages;
1363                ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
1364                if (ret != 0)
1365                        goto out;
1366
1367                for (i = 0; i < lpages; i++) {
1368                        struct page *page = lnb[i].lnb_page;
1369
1370                        /* read past eof? */
1371                        if (!page  && lnb[i].lnb_rc == 0)
1372                                continue;
1373
1374                        if (async)
1375                                lnb[i].lnb_flags |= OBD_BRW_ASYNC;
1376
1377                        if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1378                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1379                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1380                                continue;
1381
1382                        if (rw == OBD_BRW_WRITE)
1383                                echo_client_page_debug_setup(page, rw,
1384                                                             ostid_id(&oa->o_oi),
1385                                                             lnb[i].lnb_file_offset,
1386                                                             lnb[i].lnb_len);
1387                        else
1388                                echo_client_page_debug_check(page,
1389                                                             ostid_id(&oa->o_oi),
1390                                                             lnb[i].lnb_file_offset,
1391                                                             lnb[i].lnb_len);
1392                }
1393
1394                ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
1395                                   ret);
1396                if (ret != 0)
1397                        goto out;
1398
1399                /* Reuse env context. */
1400                lu_context_exit((struct lu_context *)&env->le_ctx);
1401                lu_context_enter((struct lu_context *)&env->le_ctx);
1402        }
1403
1404out:
1405        kfree(lnb);
1406        return ret;
1407}
1408
1409static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1410                                 struct obd_export *exp,
1411                                 struct obd_ioctl_data *data)
1412{
1413        struct obd_device *obd = class_exp2obd(exp);
1414        struct echo_device *ed = obd2echo_dev(obd);
1415        struct echo_client_obd *ec = ed->ed_ec;
1416        struct obdo *oa = &data->ioc_obdo1;
1417        struct echo_object *eco;
1418        int rc;
1419        int async = 1;
1420        long test_mode;
1421
1422        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1423
1424        rc = echo_get_object(&eco, ed, oa);
1425        if (rc)
1426                return rc;
1427
1428        oa->o_valid &= ~OBD_MD_FLHANDLE;
1429
1430        /* OFD/obdfilter works only via prep/commit */
1431        test_mode = (long)data->ioc_pbuf1;
1432        if (test_mode == 1)
1433                async = 0;
1434
1435        if (!ed->ed_next && test_mode != 3) {
1436                test_mode = 3;
1437                data->ioc_plen1 = data->ioc_count;
1438        }
1439
1440        /* Truncate batch size to maximum */
1441        if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1442                data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1443
1444        switch (test_mode) {
1445        case 1:
1446                /* fall through */
1447        case 2:
1448                rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
1449                                      data->ioc_count, async);
1450                break;
1451        case 3:
1452                rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
1453                                             data->ioc_offset, data->ioc_count,
1454                                             data->ioc_plen1, async);
1455                break;
1456        default:
1457                rc = -EINVAL;
1458        }
1459        echo_put_object(eco);
1460        return rc;
1461}
1462
1463static int
1464echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1465                      void *karg, void __user *uarg)
1466{
1467        struct obd_device      *obd = exp->exp_obd;
1468        struct echo_device     *ed = obd2echo_dev(obd);
1469        struct echo_client_obd *ec = ed->ed_ec;
1470        struct echo_object     *eco;
1471        struct obd_ioctl_data  *data = karg;
1472        struct lu_env     *env;
1473        struct obdo         *oa;
1474        struct lu_fid      fid;
1475        int                  rw = OBD_BRW_READ;
1476        int                  rc = 0;
1477
1478        oa = &data->ioc_obdo1;
1479        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1480                oa->o_valid |= OBD_MD_FLGROUP;
1481                ostid_set_seq_echo(&oa->o_oi);
1482        }
1483
1484        /* This FID is unpacked just for validation at this point */
1485        rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1486        if (rc < 0)
1487                return rc;
1488
1489        env = kzalloc(sizeof(*env), GFP_NOFS);
1490        if (!env)
1491                return -ENOMEM;
1492
1493        rc = lu_env_init(env, LCT_DT_THREAD);
1494        if (rc) {
1495                rc = -ENOMEM;
1496                goto out;
1497        }
1498
1499        switch (cmd) {
1500        case OBD_IOC_CREATE:                /* may create echo object */
1501                if (!capable(CFS_CAP_SYS_ADMIN)) {
1502                        rc = -EPERM;
1503                        goto out;
1504                }
1505
1506                rc = echo_create_object(env, ed, oa);
1507                goto out;
1508
1509        case OBD_IOC_DESTROY:
1510                if (!capable(CFS_CAP_SYS_ADMIN)) {
1511                        rc = -EPERM;
1512                        goto out;
1513                }
1514
1515                rc = echo_get_object(&eco, ed, oa);
1516                if (rc == 0) {
1517                        rc = obd_destroy(env, ec->ec_exp, oa);
1518                        if (rc == 0)
1519                                eco->eo_deleted = 1;
1520                        echo_put_object(eco);
1521                }
1522                goto out;
1523
1524        case OBD_IOC_GETATTR:
1525                rc = echo_get_object(&eco, ed, oa);
1526                if (rc == 0) {
1527                        rc = obd_getattr(env, ec->ec_exp, oa);
1528                        echo_put_object(eco);
1529                }
1530                goto out;
1531
1532        case OBD_IOC_SETATTR:
1533                if (!capable(CFS_CAP_SYS_ADMIN)) {
1534                        rc = -EPERM;
1535                        goto out;
1536                }
1537
1538                rc = echo_get_object(&eco, ed, oa);
1539                if (rc == 0) {
1540                        rc = obd_setattr(env, ec->ec_exp, oa);
1541                        echo_put_object(eco);
1542                }
1543                goto out;
1544
1545        case OBD_IOC_BRW_WRITE:
1546                if (!capable(CFS_CAP_SYS_ADMIN)) {
1547                        rc = -EPERM;
1548                        goto out;
1549                }
1550
1551                rw = OBD_BRW_WRITE;
1552                /* fall through */
1553        case OBD_IOC_BRW_READ:
1554                rc = echo_client_brw_ioctl(env, rw, exp, data);
1555                goto out;
1556
1557        default:
1558                CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1559                rc = -ENOTTY;
1560                goto out;
1561        }
1562
1563out:
1564        lu_env_fini(env);
1565        kfree(env);
1566
1567        return rc;
1568}
1569
1570static int echo_client_setup(const struct lu_env *env,
1571                             struct obd_device *obddev, struct lustre_cfg *lcfg)
1572{
1573        struct echo_client_obd *ec = &obddev->u.echo_client;
1574        struct obd_device *tgt;
1575        struct obd_uuid echo_uuid = { "ECHO_UUID" };
1576        struct obd_connect_data *ocd = NULL;
1577        int rc;
1578
1579        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1580                CERROR("requires a TARGET OBD name\n");
1581                return -EINVAL;
1582        }
1583
1584        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1585        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1586                CERROR("device not attached or not set up (%s)\n",
1587                       lustre_cfg_string(lcfg, 1));
1588                return -EINVAL;
1589        }
1590
1591        spin_lock_init(&ec->ec_lock);
1592        INIT_LIST_HEAD(&ec->ec_objects);
1593        INIT_LIST_HEAD(&ec->ec_locks);
1594        ec->ec_unique = 0;
1595
1596        ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
1597        if (!ocd)
1598                return -ENOMEM;
1599
1600        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1601                                 OBD_CONNECT_BRW_SIZE |
1602                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1603                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1604                                 OBD_CONNECT_FID;
1605        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1606        ocd->ocd_version = LUSTRE_VERSION_CODE;
1607        ocd->ocd_group = FID_SEQ_ECHO;
1608
1609        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1610
1611        kfree(ocd);
1612
1613        if (rc != 0) {
1614                CERROR("fail to connect to device %s\n",
1615                       lustre_cfg_string(lcfg, 1));
1616                return rc;
1617        }
1618
1619        return rc;
1620}
1621
1622static int echo_client_cleanup(struct obd_device *obddev)
1623{
1624        struct echo_client_obd *ec = &obddev->u.echo_client;
1625        int rc;
1626
1627        if (!list_empty(&obddev->obd_exports)) {
1628                CERROR("still has clients!\n");
1629                return -EBUSY;
1630        }
1631
1632        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1633        rc = obd_disconnect(ec->ec_exp);
1634        if (rc != 0)
1635                CERROR("fail to disconnect device: %d\n", rc);
1636
1637        return rc;
1638}
1639
1640static int echo_client_connect(const struct lu_env *env,
1641                               struct obd_export **exp,
1642                               struct obd_device *src, struct obd_uuid *cluuid,
1643                               struct obd_connect_data *data, void *localdata)
1644{
1645        int             rc;
1646        struct lustre_handle conn = { 0 };
1647
1648        rc = class_connect(&conn, src, cluuid);
1649        if (rc == 0)
1650                *exp = class_conn2export(&conn);
1651
1652        return rc;
1653}
1654
1655static int echo_client_disconnect(struct obd_export *exp)
1656{
1657        int                  rc;
1658
1659        if (!exp) {
1660                rc = -EINVAL;
1661                goto out;
1662        }
1663
1664        rc = class_disconnect(exp);
1665        goto out;
1666 out:
1667        return rc;
1668}
1669
1670static struct obd_ops echo_client_obd_ops = {
1671        .owner          = THIS_MODULE,
1672        .iocontrol      = echo_client_iocontrol,
1673        .connect        = echo_client_connect,
1674        .disconnect     = echo_client_disconnect
1675};
1676
1677static int echo_client_init(void)
1678{
1679        int rc;
1680
1681        rc = lu_kmem_init(echo_caches);
1682        if (rc == 0) {
1683                rc = class_register_type(&echo_client_obd_ops, NULL,
1684                                         LUSTRE_ECHO_CLIENT_NAME,
1685                                         &echo_device_type);
1686                if (rc)
1687                        lu_kmem_fini(echo_caches);
1688        }
1689        return rc;
1690}
1691
1692static void echo_client_exit(void)
1693{
1694        class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1695        lu_kmem_fini(echo_caches);
1696}
1697
1698static int __init obdecho_init(void)
1699{
1700        LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1701
1702        LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
1703
1704        return echo_client_init();
1705}
1706
1707static void /*__exit*/ obdecho_exit(void)
1708{
1709        echo_client_exit();
1710}
1711
1712MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1713MODULE_DESCRIPTION("Lustre Echo Client test driver");
1714MODULE_VERSION(LUSTRE_VERSION_STRING);
1715MODULE_LICENSE("GPL");
1716
1717module_init(obdecho_init);
1718module_exit(obdecho_exit);
1719
1720/** @} echo_client */
1721