linux/drivers/staging/lustre/lustre/obdecho/echo_client.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_ECHO
  38#include "../../include/linux/libcfs/libcfs.h"
  39
  40#include "../include/obd.h"
  41#include "../include/obd_support.h"
  42#include "../include/obd_class.h"
  43#include "../include/lustre_debug.h"
  44#include "../include/lprocfs_status.h"
  45#include "../include/cl_object.h"
  46#include "../include/lustre_fid.h"
  47#include "../include/lustre_acl.h"
  48#include "../include/lustre_net.h"
  49
  50#include "echo_internal.h"
  51
  52/** \defgroup echo_client Echo Client
  53 * @{
  54 */
  55
  56struct echo_device {
  57        struct cl_device        ed_cl;
  58        struct echo_client_obd *ed_ec;
  59
  60        struct cl_site    ed_site_myself;
  61        struct cl_site   *ed_site;
  62        struct lu_device       *ed_next;
  63        int                  ed_next_islov;
  64};
  65
  66struct echo_object {
  67        struct cl_object        eo_cl;
  68        struct cl_object_header eo_hdr;
  69
  70        struct echo_device     *eo_dev;
  71        struct list_head              eo_obj_chain;
  72        struct lov_stripe_md   *eo_lsm;
  73        atomic_t            eo_npages;
  74        int                  eo_deleted;
  75};
  76
  77struct echo_object_conf {
  78        struct cl_object_conf  eoc_cl;
  79        struct lov_stripe_md **eoc_md;
  80};
  81
  82struct echo_page {
  83        struct cl_page_slice   ep_cl;
  84        struct mutex            ep_lock;
  85        struct page         *ep_vmpage;
  86};
  87
  88struct echo_lock {
  89        struct cl_lock_slice   el_cl;
  90        struct list_head             el_chain;
  91        struct echo_object    *el_object;
  92        __u64             el_cookie;
  93        atomic_t           el_refcount;
  94};
  95
  96static int echo_client_setup(const struct lu_env *env,
  97                             struct obd_device *obddev,
  98                             struct lustre_cfg *lcfg);
  99static int echo_client_cleanup(struct obd_device *obddev);
 100
 101
 102/** \defgroup echo_helpers Helper functions
 103 * @{
 104 */
 105static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 106{
 107        return container_of0(dev, struct echo_device, ed_cl);
 108}
 109
 110static inline struct cl_device *echo_dev2cl(struct echo_device *d)
 111{
 112        return &d->ed_cl;
 113}
 114
 115static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
 116{
 117        return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
 118}
 119
 120static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
 121{
 122        return &eco->eo_cl;
 123}
 124
 125static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
 126{
 127        return container_of(o, struct echo_object, eo_cl);
 128}
 129
 130static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
 131{
 132        return container_of(s, struct echo_page, ep_cl);
 133}
 134
 135static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
 136{
 137        return container_of(s, struct echo_lock, el_cl);
 138}
 139
 140static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
 141{
 142        return ecl->el_cl.cls_lock;
 143}
 144
 145static struct lu_context_key echo_thread_key;
 146static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
 147{
 148        struct echo_thread_info *info;
 149        info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
 150        LASSERT(info != NULL);
 151        return info;
 152}
 153
 154static inline
 155struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 156{
 157        return container_of(c, struct echo_object_conf, eoc_cl);
 158}
 159
 160/** @} echo_helpers */
 161
 162static struct echo_object *cl_echo_object_find(struct echo_device *d,
 163                                               struct lov_stripe_md **lsm);
 164static int cl_echo_object_put(struct echo_object *eco);
 165static int cl_echo_enqueue(struct echo_object *eco, u64 start,
 166                           u64 end, int mode, __u64 *cookie);
 167static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
 168static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
 169                              struct page **pages, int npages, int async);
 170
 171static struct echo_thread_info *echo_env_info(const struct lu_env *env);
 172
 173struct echo_thread_info {
 174        struct echo_object_conf eti_conf;
 175        struct lustre_md        eti_md;
 176
 177        struct cl_2queue        eti_queue;
 178        struct cl_io        eti_io;
 179        struct cl_lock_descr    eti_descr;
 180        struct lu_fid      eti_fid;
 181        struct lu_fid           eti_fid2;
 182};
 183
 184/* No session used right now */
 185struct echo_session_info {
 186        unsigned long dummy;
 187};
 188
 189static struct kmem_cache *echo_lock_kmem;
 190static struct kmem_cache *echo_object_kmem;
 191static struct kmem_cache *echo_thread_kmem;
 192static struct kmem_cache *echo_session_kmem;
 193
 194static struct lu_kmem_descr echo_caches[] = {
 195        {
 196                .ckd_cache = &echo_lock_kmem,
 197                .ckd_name  = "echo_lock_kmem",
 198                .ckd_size  = sizeof (struct echo_lock)
 199        },
 200        {
 201                .ckd_cache = &echo_object_kmem,
 202                .ckd_name  = "echo_object_kmem",
 203                .ckd_size  = sizeof (struct echo_object)
 204        },
 205        {
 206                .ckd_cache = &echo_thread_kmem,
 207                .ckd_name  = "echo_thread_kmem",
 208                .ckd_size  = sizeof (struct echo_thread_info)
 209        },
 210        {
 211                .ckd_cache = &echo_session_kmem,
 212                .ckd_name  = "echo_session_kmem",
 213                .ckd_size  = sizeof (struct echo_session_info)
 214        },
 215        {
 216                .ckd_cache = NULL
 217        }
 218};
 219
 220/** \defgroup echo_page Page operations
 221 *
 222 * Echo page operations.
 223 *
 224 * @{
 225 */
 226static struct page *echo_page_vmpage(const struct lu_env *env,
 227                                    const struct cl_page_slice *slice)
 228{
 229        return cl2echo_page(slice)->ep_vmpage;
 230}
 231
 232static int echo_page_own(const struct lu_env *env,
 233                         const struct cl_page_slice *slice,
 234                         struct cl_io *io, int nonblock)
 235{
 236        struct echo_page *ep = cl2echo_page(slice);
 237
 238        if (!nonblock)
 239                mutex_lock(&ep->ep_lock);
 240        else if (!mutex_trylock(&ep->ep_lock))
 241                return -EAGAIN;
 242        return 0;
 243}
 244
 245static void echo_page_disown(const struct lu_env *env,
 246                             const struct cl_page_slice *slice,
 247                             struct cl_io *io)
 248{
 249        struct echo_page *ep = cl2echo_page(slice);
 250
 251        LASSERT(mutex_is_locked(&ep->ep_lock));
 252        mutex_unlock(&ep->ep_lock);
 253}
 254
 255static void echo_page_discard(const struct lu_env *env,
 256                              const struct cl_page_slice *slice,
 257                              struct cl_io *unused)
 258{
 259        cl_page_delete(env, slice->cpl_page);
 260}
 261
 262static int echo_page_is_vmlocked(const struct lu_env *env,
 263                                 const struct cl_page_slice *slice)
 264{
 265        if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
 266                return -EBUSY;
 267        return -ENODATA;
 268}
 269
 270static void echo_page_completion(const struct lu_env *env,
 271                                 const struct cl_page_slice *slice,
 272                                 int ioret)
 273{
 274        LASSERT(slice->cpl_page->cp_sync_io != NULL);
 275}
 276
 277static void echo_page_fini(const struct lu_env *env,
 278                           struct cl_page_slice *slice)
 279{
 280        struct echo_page *ep    = cl2echo_page(slice);
 281        struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
 282        struct page *vmpage      = ep->ep_vmpage;
 283
 284        atomic_dec(&eco->eo_npages);
 285        page_cache_release(vmpage);
 286}
 287
 288static int echo_page_prep(const struct lu_env *env,
 289                          const struct cl_page_slice *slice,
 290                          struct cl_io *unused)
 291{
 292        return 0;
 293}
 294
 295static int echo_page_print(const struct lu_env *env,
 296                           const struct cl_page_slice *slice,
 297                           void *cookie, lu_printer_t printer)
 298{
 299        struct echo_page *ep = cl2echo_page(slice);
 300
 301        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
 302                   ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
 303        return 0;
 304}
 305
 306static const struct cl_page_operations echo_page_ops = {
 307        .cpo_own           = echo_page_own,
 308        .cpo_disown     = echo_page_disown,
 309        .cpo_discard       = echo_page_discard,
 310        .cpo_vmpage     = echo_page_vmpage,
 311        .cpo_fini         = echo_page_fini,
 312        .cpo_print       = echo_page_print,
 313        .cpo_is_vmlocked   = echo_page_is_vmlocked,
 314        .io = {
 315                [CRT_READ] = {
 316                        .cpo_prep       = echo_page_prep,
 317                        .cpo_completion  = echo_page_completion,
 318                },
 319                [CRT_WRITE] = {
 320                        .cpo_prep       = echo_page_prep,
 321                        .cpo_completion  = echo_page_completion,
 322                }
 323        }
 324};
 325/** @} echo_page */
 326
 327/** \defgroup echo_lock Locking
 328 *
 329 * echo lock operations
 330 *
 331 * @{
 332 */
 333static void echo_lock_fini(const struct lu_env *env,
 334                           struct cl_lock_slice *slice)
 335{
 336        struct echo_lock *ecl = cl2echo_lock(slice);
 337
 338        LASSERT(list_empty(&ecl->el_chain));
 339        OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
 340}
 341
 342static void echo_lock_delete(const struct lu_env *env,
 343                             const struct cl_lock_slice *slice)
 344{
 345        struct echo_lock *ecl      = cl2echo_lock(slice);
 346
 347        LASSERT(list_empty(&ecl->el_chain));
 348}
 349
 350static int echo_lock_fits_into(const struct lu_env *env,
 351                               const struct cl_lock_slice *slice,
 352                               const struct cl_lock_descr *need,
 353                               const struct cl_io *unused)
 354{
 355        return 1;
 356}
 357
 358static struct cl_lock_operations echo_lock_ops = {
 359        .clo_fini      = echo_lock_fini,
 360        .clo_delete    = echo_lock_delete,
 361        .clo_fits_into = echo_lock_fits_into
 362};
 363
 364/** @} echo_lock */
 365
 366/** \defgroup echo_cl_ops cl_object operations
 367 *
 368 * operations for cl_object
 369 *
 370 * @{
 371 */
 372static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 373                        struct cl_page *page, struct page *vmpage)
 374{
 375        struct echo_page *ep = cl_object_page_slice(obj, page);
 376        struct echo_object *eco = cl2echo_obj(obj);
 377
 378        ep->ep_vmpage = vmpage;
 379        page_cache_get(vmpage);
 380        mutex_init(&ep->ep_lock);
 381        cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
 382        atomic_inc(&eco->eo_npages);
 383        return 0;
 384}
 385
 386static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
 387                        struct cl_io *io)
 388{
 389        return 0;
 390}
 391
 392static int echo_lock_init(const struct lu_env *env,
 393                          struct cl_object *obj, struct cl_lock *lock,
 394                          const struct cl_io *unused)
 395{
 396        struct echo_lock *el;
 397
 398        OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
 399        if (el != NULL) {
 400                cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
 401                el->el_object = cl2echo_obj(obj);
 402                INIT_LIST_HEAD(&el->el_chain);
 403                atomic_set(&el->el_refcount, 0);
 404        }
 405        return el == NULL ? -ENOMEM : 0;
 406}
 407
 408static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
 409                         const struct cl_object_conf *conf)
 410{
 411        return 0;
 412}
 413
 414static const struct cl_object_operations echo_cl_obj_ops = {
 415        .coo_page_init = echo_page_init,
 416        .coo_lock_init = echo_lock_init,
 417        .coo_io_init   = echo_io_init,
 418        .coo_conf_set  = echo_conf_set
 419};
 420/** @} echo_cl_ops */
 421
 422/** \defgroup echo_lu_ops lu_object operations
 423 *
 424 * operations for echo lu object.
 425 *
 426 * @{
 427 */
 428static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
 429                            const struct lu_object_conf *conf)
 430{
 431        struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
 432        struct echo_client_obd *ec     = ed->ed_ec;
 433        struct echo_object *eco = cl2echo_obj(lu2cl(obj));
 434        const struct cl_object_conf *cconf;
 435        struct echo_object_conf *econf;
 436
 437        if (ed->ed_next) {
 438                struct lu_object  *below;
 439                struct lu_device  *under;
 440
 441                under = ed->ed_next;
 442                below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
 443                                                        under);
 444                if (below == NULL)
 445                        return -ENOMEM;
 446                lu_object_add(obj, below);
 447        }
 448
 449        cconf = lu2cl_conf(conf);
 450        econf = cl2echo_conf(cconf);
 451
 452        LASSERT(econf->eoc_md);
 453        eco->eo_lsm = *econf->eoc_md;
 454        /* clear the lsm pointer so that it won't get freed. */
 455        *econf->eoc_md = NULL;
 456
 457        eco->eo_dev = ed;
 458        atomic_set(&eco->eo_npages, 0);
 459        cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
 460
 461        spin_lock(&ec->ec_lock);
 462        list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
 463        spin_unlock(&ec->ec_lock);
 464
 465        return 0;
 466}
 467
 468/* taken from osc_unpackmd() */
 469static int echo_alloc_memmd(struct echo_device *ed,
 470                            struct lov_stripe_md **lsmp)
 471{
 472        int lsm_size;
 473
 474        /* If export is lov/osc then use their obd method */
 475        if (ed->ed_next != NULL)
 476                return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
 477        /* OFD has no unpackmd method, do everything here */
 478        lsm_size = lov_stripe_md_size(1);
 479
 480        LASSERT(*lsmp == NULL);
 481        OBD_ALLOC(*lsmp, lsm_size);
 482        if (*lsmp == NULL)
 483                return -ENOMEM;
 484
 485        OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 486        if ((*lsmp)->lsm_oinfo[0] == NULL) {
 487                OBD_FREE(*lsmp, lsm_size);
 488                return -ENOMEM;
 489        }
 490
 491        loi_init((*lsmp)->lsm_oinfo[0]);
 492        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 493        ostid_set_seq_echo(&(*lsmp)->lsm_oi);
 494
 495        return lsm_size;
 496}
 497
 498static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
 499{
 500        int lsm_size;
 501
 502        /* If export is lov/osc then use their obd method */
 503        if (ed->ed_next != NULL)
 504                return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
 505        /* OFD has no unpackmd method, do everything here */
 506        lsm_size = lov_stripe_md_size(1);
 507
 508        LASSERT(*lsmp != NULL);
 509        OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 510        OBD_FREE(*lsmp, lsm_size);
 511        *lsmp = NULL;
 512        return 0;
 513}
 514
 515static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 516{
 517        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
 518        struct echo_client_obd *ec = eco->eo_dev->ed_ec;
 519
 520        LASSERT(atomic_read(&eco->eo_npages) == 0);
 521
 522        spin_lock(&ec->ec_lock);
 523        list_del_init(&eco->eo_obj_chain);
 524        spin_unlock(&ec->ec_lock);
 525
 526        lu_object_fini(obj);
 527        lu_object_header_fini(obj->lo_header);
 528
 529        if (eco->eo_lsm)
 530                echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
 531        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
 532}
 533
 534static int echo_object_print(const struct lu_env *env, void *cookie,
 535                            lu_printer_t p, const struct lu_object *o)
 536{
 537        struct echo_object *obj = cl2echo_obj(lu2cl(o));
 538
 539        return (*p)(env, cookie, "echoclient-object@%p", obj);
 540}
 541
 542static const struct lu_object_operations echo_lu_obj_ops = {
 543        .loo_object_init      = echo_object_init,
 544        .loo_object_delete    = NULL,
 545        .loo_object_release   = NULL,
 546        .loo_object_free      = echo_object_free,
 547        .loo_object_print     = echo_object_print,
 548        .loo_object_invariant = NULL
 549};
 550/** @} echo_lu_ops */
 551
 552/** \defgroup echo_lu_dev_ops  lu_device operations
 553 *
 554 * Operations for echo lu device.
 555 *
 556 * @{
 557 */
 558static struct lu_object *echo_object_alloc(const struct lu_env *env,
 559                                           const struct lu_object_header *hdr,
 560                                           struct lu_device *dev)
 561{
 562        struct echo_object *eco;
 563        struct lu_object *obj = NULL;
 564
 565        /* we're the top dev. */
 566        LASSERT(hdr == NULL);
 567        OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
 568        if (eco != NULL) {
 569                struct cl_object_header *hdr = &eco->eo_hdr;
 570
 571                obj = &echo_obj2cl(eco)->co_lu;
 572                cl_object_header_init(hdr);
 573                lu_object_init(obj, &hdr->coh_lu, dev);
 574                lu_object_add_top(&hdr->coh_lu, obj);
 575
 576                eco->eo_cl.co_ops = &echo_cl_obj_ops;
 577                obj->lo_ops       = &echo_lu_obj_ops;
 578        }
 579        return obj;
 580}
 581
 582static struct lu_device_operations echo_device_lu_ops = {
 583        .ldo_object_alloc   = echo_object_alloc,
 584};
 585
 586/** @} echo_lu_dev_ops */
 587
 588static struct cl_device_operations echo_device_cl_ops = {
 589};
 590
 591/** \defgroup echo_init Setup and teardown
 592 *
 593 * Init and fini functions for echo client.
 594 *
 595 * @{
 596 */
 597static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 598{
 599        struct cl_site *site = &ed->ed_site_myself;
 600        int rc;
 601
 602        /* initialize site */
 603        rc = cl_site_init(site, &ed->ed_cl);
 604        if (rc) {
 605                CERROR("Cannot initialize site for echo client(%d)\n", rc);
 606                return rc;
 607        }
 608
 609        rc = lu_site_init_finish(&site->cs_lu);
 610        if (rc)
 611                return rc;
 612
 613        ed->ed_site = site;
 614        return 0;
 615}
 616
 617static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
 618{
 619        if (ed->ed_site) {
 620                cl_site_fini(ed->ed_site);
 621                ed->ed_site = NULL;
 622        }
 623}
 624
 625static void *echo_thread_key_init(const struct lu_context *ctx,
 626                          struct lu_context_key *key)
 627{
 628        struct echo_thread_info *info;
 629
 630        OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
 631        if (info == NULL)
 632                info = ERR_PTR(-ENOMEM);
 633        return info;
 634}
 635
 636static void echo_thread_key_fini(const struct lu_context *ctx,
 637                         struct lu_context_key *key, void *data)
 638{
 639        struct echo_thread_info *info = data;
 640        OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
 641}
 642
 643static void echo_thread_key_exit(const struct lu_context *ctx,
 644                         struct lu_context_key *key, void *data)
 645{
 646}
 647
 648static struct lu_context_key echo_thread_key = {
 649        .lct_tags = LCT_CL_THREAD,
 650        .lct_init = echo_thread_key_init,
 651        .lct_fini = echo_thread_key_fini,
 652        .lct_exit = echo_thread_key_exit
 653};
 654
 655static void *echo_session_key_init(const struct lu_context *ctx,
 656                                  struct lu_context_key *key)
 657{
 658        struct echo_session_info *session;
 659
 660        OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
 661        if (session == NULL)
 662                session = ERR_PTR(-ENOMEM);
 663        return session;
 664}
 665
 666static void echo_session_key_fini(const struct lu_context *ctx,
 667                                 struct lu_context_key *key, void *data)
 668{
 669        struct echo_session_info *session = data;
 670        OBD_SLAB_FREE_PTR(session, echo_session_kmem);
 671}
 672
 673static void echo_session_key_exit(const struct lu_context *ctx,
 674                                 struct lu_context_key *key, void *data)
 675{
 676}
 677
 678static struct lu_context_key echo_session_key = {
 679        .lct_tags = LCT_SESSION,
 680        .lct_init = echo_session_key_init,
 681        .lct_fini = echo_session_key_fini,
 682        .lct_exit = echo_session_key_exit
 683};
 684
 685LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 686
 687static struct lu_device *echo_device_alloc(const struct lu_env *env,
 688                                           struct lu_device_type *t,
 689                                           struct lustre_cfg *cfg)
 690{
 691        struct lu_device   *next;
 692        struct echo_device *ed;
 693        struct cl_device   *cd;
 694        struct obd_device  *obd = NULL; /* to keep compiler happy */
 695        struct obd_device  *tgt;
 696        const char *tgt_type_name;
 697        int rc;
 698        int cleanup = 0;
 699
 700        OBD_ALLOC_PTR(ed);
 701        if (ed == NULL) {
 702                rc = -ENOMEM;
 703                goto out;
 704        }
 705
 706        cleanup = 1;
 707        cd = &ed->ed_cl;
 708        rc = cl_device_init(cd, t);
 709        if (rc)
 710                goto out;
 711
 712        cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
 713        cd->cd_ops = &echo_device_cl_ops;
 714
 715        cleanup = 2;
 716        obd = class_name2obd(lustre_cfg_string(cfg, 0));
 717        LASSERT(obd != NULL);
 718        LASSERT(env != NULL);
 719
 720        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
 721        if (tgt == NULL) {
 722                CERROR("Can not find tgt device %s\n",
 723                        lustre_cfg_string(cfg, 1));
 724                rc = -ENODEV;
 725                goto out;
 726        }
 727
 728        next = tgt->obd_lu_dev;
 729        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 730                CERROR("echo MDT client must be run on server\n");
 731                rc = -EOPNOTSUPP;
 732                goto out;
 733        }
 734
 735        rc = echo_site_init(env, ed);
 736        if (rc)
 737                goto out;
 738
 739        cleanup = 3;
 740
 741        rc = echo_client_setup(env, obd, cfg);
 742        if (rc)
 743                goto out;
 744
 745        ed->ed_ec = &obd->u.echo_client;
 746        cleanup = 4;
 747
 748        /* if echo client is to be stacked upon ost device, the next is
 749         * NULL since ost is not a clio device so far */
 750        if (next != NULL && !lu_device_is_cl(next))
 751                next = NULL;
 752
 753        tgt_type_name = tgt->obd_type->typ_name;
 754        if (next != NULL) {
 755                LASSERT(next != NULL);
 756                if (next->ld_site != NULL) {
 757                        rc = -EBUSY;
 758                        goto out;
 759                }
 760
 761                next->ld_site = &ed->ed_site->cs_lu;
 762                rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
 763                                                next->ld_type->ldt_name,
 764                                                              NULL);
 765                if (rc)
 766                        goto out;
 767
 768                /* Tricky case, I have to determine the obd type since
 769                 * CLIO uses the different parameters to initialize
 770                 * objects for lov & osc. */
 771                if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
 772                        ed->ed_next_islov = 1;
 773                else
 774                        LASSERT(strcmp(tgt_type_name,
 775                                       LUSTRE_OSC_NAME) == 0);
 776        } else {
 777                LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
 778        }
 779
 780        ed->ed_next = next;
 781        return &cd->cd_lu_dev;
 782out:
 783        switch (cleanup) {
 784        case 4: {
 785                int rc2;
 786                rc2 = echo_client_cleanup(obd);
 787                if (rc2)
 788                        CERROR("Cleanup obd device %s error(%d)\n",
 789                               obd->obd_name, rc2);
 790        }
 791
 792        case 3:
 793                echo_site_fini(env, ed);
 794        case 2:
 795                cl_device_fini(&ed->ed_cl);
 796        case 1:
 797                OBD_FREE_PTR(ed);
 798        case 0:
 799        default:
 800                break;
 801        }
 802        return ERR_PTR(rc);
 803}
 804
 805static int echo_device_init(const struct lu_env *env, struct lu_device *d,
 806                          const char *name, struct lu_device *next)
 807{
 808        LBUG();
 809        return 0;
 810}
 811
 812static struct lu_device *echo_device_fini(const struct lu_env *env,
 813                                          struct lu_device *d)
 814{
 815        struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
 816        struct lu_device *next = ed->ed_next;
 817
 818        while (next)
 819                next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
 820        return NULL;
 821}
 822
 823static void echo_lock_release(const struct lu_env *env,
 824                              struct echo_lock *ecl,
 825                              int still_used)
 826{
 827        struct cl_lock *clk = echo_lock2cl(ecl);
 828
 829        cl_lock_get(clk);
 830        cl_unuse(env, clk);
 831        cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
 832        if (!still_used) {
 833                cl_lock_mutex_get(env, clk);
 834                cl_lock_cancel(env, clk);
 835                cl_lock_delete(env, clk);
 836                cl_lock_mutex_put(env, clk);
 837        }
 838        cl_lock_put(env, clk);
 839}
 840
 841static struct lu_device *echo_device_free(const struct lu_env *env,
 842                                          struct lu_device *d)
 843{
 844        struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
 845        struct echo_client_obd *ec   = ed->ed_ec;
 846        struct echo_object     *eco;
 847        struct lu_device       *next = ed->ed_next;
 848
 849        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
 850               ed, next);
 851
 852        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 853
 854        /* check if there are objects still alive.
 855         * It shouldn't have any object because lu_site_purge would cleanup
 856         * all of cached objects. Anyway, probably the echo device is being
 857         * parallelly accessed.
 858         */
 859        spin_lock(&ec->ec_lock);
 860        list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
 861                eco->eo_deleted = 1;
 862        spin_unlock(&ec->ec_lock);
 863
 864        /* purge again */
 865        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 866
 867        CDEBUG(D_INFO,
 868               "Waiting for the reference of echo object to be dropped\n");
 869
 870        /* Wait for the last reference to be dropped. */
 871        spin_lock(&ec->ec_lock);
 872        while (!list_empty(&ec->ec_objects)) {
 873                spin_unlock(&ec->ec_lock);
 874                CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
 875                set_current_state(TASK_UNINTERRUPTIBLE);
 876                schedule_timeout(cfs_time_seconds(1));
 877                lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 878                spin_lock(&ec->ec_lock);
 879        }
 880        spin_unlock(&ec->ec_lock);
 881
 882        LASSERT(list_empty(&ec->ec_locks));
 883
 884        CDEBUG(D_INFO, "No object exists, exiting...\n");
 885
 886        echo_client_cleanup(d->ld_obd);
 887
 888        while (next)
 889                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 890
 891        LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
 892        echo_site_fini(env, ed);
 893        cl_device_fini(&ed->ed_cl);
 894        OBD_FREE_PTR(ed);
 895
 896        return NULL;
 897}
 898
 899static const struct lu_device_type_operations echo_device_type_ops = {
 900        .ldto_init = echo_type_init,
 901        .ldto_fini = echo_type_fini,
 902
 903        .ldto_start = echo_type_start,
 904        .ldto_stop  = echo_type_stop,
 905
 906        .ldto_device_alloc = echo_device_alloc,
 907        .ldto_device_free  = echo_device_free,
 908        .ldto_device_init  = echo_device_init,
 909        .ldto_device_fini  = echo_device_fini
 910};
 911
 912static struct lu_device_type echo_device_type = {
 913        .ldt_tags     = LU_DEVICE_CL,
 914        .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
 915        .ldt_ops      = &echo_device_type_ops,
 916        .ldt_ctx_tags = LCT_CL_THREAD,
 917};
 918/** @} echo_init */
 919
 920/** \defgroup echo_exports Exported operations
 921 *
 922 * exporting functions to echo client
 923 *
 924 * @{
 925 */
 926
 927/* Interfaces to echo client obd device */
 928static struct echo_object *cl_echo_object_find(struct echo_device *d,
 929                                               struct lov_stripe_md **lsmp)
 930{
 931        struct lu_env *env;
 932        struct echo_thread_info *info;
 933        struct echo_object_conf *conf;
 934        struct lov_stripe_md    *lsm;
 935        struct echo_object *eco;
 936        struct cl_object   *obj;
 937        struct lu_fid *fid;
 938        int refcheck;
 939        int rc;
 940
 941        LASSERT(lsmp);
 942        lsm = *lsmp;
 943        LASSERT(lsm);
 944        LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
 945        LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
 946                 POSTID(&lsm->lsm_oi));
 947
 948        /* Never return an object if the obd is to be freed. */
 949        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
 950                return ERR_PTR(-ENODEV);
 951
 952        env = cl_env_get(&refcheck);
 953        if (IS_ERR(env))
 954                return (void *)env;
 955
 956        info = echo_env_info(env);
 957        conf = &info->eti_conf;
 958        if (d->ed_next) {
 959                if (!d->ed_next_islov) {
 960                        struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
 961                        LASSERT(oinfo != NULL);
 962                        oinfo->loi_oi = lsm->lsm_oi;
 963                        conf->eoc_cl.u.coc_oinfo = oinfo;
 964                } else {
 965                        struct lustre_md *md;
 966                        md = &info->eti_md;
 967                        memset(md, 0, sizeof(*md));
 968                        md->lsm = lsm;
 969                        conf->eoc_cl.u.coc_md = md;
 970                }
 971        }
 972        conf->eoc_md = lsmp;
 973
 974        fid  = &info->eti_fid;
 975        rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
 976        if (rc != 0) {
 977                eco = ERR_PTR(rc);
 978                goto out;
 979        }
 980
 981        /* In the function below, .hs_keycmp resolves to
 982         * lu_obj_hop_keycmp() */
 983        /* coverity[overrun-buffer-val] */
 984        obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
 985        if (IS_ERR(obj)) {
 986                eco = (void *)obj;
 987                goto out;
 988        }
 989
 990        eco = cl2echo_obj(obj);
 991        if (eco->eo_deleted) {
 992                cl_object_put(env, obj);
 993                eco = ERR_PTR(-EAGAIN);
 994        }
 995
 996out:
 997        cl_env_put(env, &refcheck);
 998        return eco;
 999}
1000
1001static int cl_echo_object_put(struct echo_object *eco)
1002{
1003        struct lu_env *env;
1004        struct cl_object *obj = echo_obj2cl(eco);
1005        int refcheck;
1006
1007        env = cl_env_get(&refcheck);
1008        if (IS_ERR(env))
1009                return PTR_ERR(env);
1010
1011        /* an external function to kill an object? */
1012        if (eco->eo_deleted) {
1013                struct lu_object_header *loh = obj->co_lu.lo_header;
1014                LASSERT(&eco->eo_hdr == luh2coh(loh));
1015                set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1016        }
1017
1018        cl_object_put(env, obj);
1019        cl_env_put(env, &refcheck);
1020        return 0;
1021}
1022
1023static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1024                            u64 start, u64 end, int mode,
1025                            __u64 *cookie , __u32 enqflags)
1026{
1027        struct cl_io *io;
1028        struct cl_lock *lck;
1029        struct cl_object *obj;
1030        struct cl_lock_descr *descr;
1031        struct echo_thread_info *info;
1032        int rc = -ENOMEM;
1033
1034        info = echo_env_info(env);
1035        io = &info->eti_io;
1036        descr = &info->eti_descr;
1037        obj = echo_obj2cl(eco);
1038
1039        descr->cld_obj   = obj;
1040        descr->cld_start = cl_index(obj, start);
1041        descr->cld_end   = cl_index(obj, end);
1042        descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1043        descr->cld_enq_flags = enqflags;
1044        io->ci_obj = obj;
1045
1046        lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1047        if (lck) {
1048                struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1049                struct echo_lock *el;
1050
1051                rc = cl_wait(env, lck);
1052                if (rc == 0) {
1053                        el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1054                        spin_lock(&ec->ec_lock);
1055                        if (list_empty(&el->el_chain)) {
1056                                list_add(&el->el_chain, &ec->ec_locks);
1057                                el->el_cookie = ++ec->ec_unique;
1058                        }
1059                        atomic_inc(&el->el_refcount);
1060                        *cookie = el->el_cookie;
1061                        spin_unlock(&ec->ec_lock);
1062                } else {
1063                        cl_lock_release(env, lck, "ec enqueue", current);
1064                }
1065        }
1066        return rc;
1067}
1068
1069static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1070                           int mode, __u64 *cookie)
1071{
1072        struct echo_thread_info *info;
1073        struct lu_env *env;
1074        struct cl_io *io;
1075        int refcheck;
1076        int result;
1077
1078        env = cl_env_get(&refcheck);
1079        if (IS_ERR(env))
1080                return PTR_ERR(env);
1081
1082        info = echo_env_info(env);
1083        io = &info->eti_io;
1084
1085        io->ci_ignore_layout = 1;
1086        result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1087        if (result < 0)
1088                goto out;
1089        LASSERT(result == 0);
1090
1091        result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1092        cl_io_fini(env, io);
1093
1094out:
1095        cl_env_put(env, &refcheck);
1096        return result;
1097}
1098
1099static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1100                           __u64 cookie)
1101{
1102        struct echo_client_obd *ec = ed->ed_ec;
1103        struct echo_lock       *ecl = NULL;
1104        struct list_head             *el;
1105        int found = 0, still_used = 0;
1106
1107        LASSERT(ec != NULL);
1108        spin_lock(&ec->ec_lock);
1109        list_for_each (el, &ec->ec_locks) {
1110                ecl = list_entry (el, struct echo_lock, el_chain);
1111                CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1112                found = (ecl->el_cookie == cookie);
1113                if (found) {
1114                        if (atomic_dec_and_test(&ecl->el_refcount))
1115                                list_del_init(&ecl->el_chain);
1116                        else
1117                                still_used = 1;
1118                        break;
1119                }
1120        }
1121        spin_unlock(&ec->ec_lock);
1122
1123        if (!found)
1124                return -ENOENT;
1125
1126        echo_lock_release(env, ecl, still_used);
1127        return 0;
1128}
1129
1130static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1131{
1132        struct lu_env *env;
1133        int refcheck;
1134        int rc;
1135
1136        env = cl_env_get(&refcheck);
1137        if (IS_ERR(env))
1138                return PTR_ERR(env);
1139
1140        rc = cl_echo_cancel0(env, ed, cookie);
1141
1142        cl_env_put(env, &refcheck);
1143        return rc;
1144}
1145
1146static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1147                             enum cl_req_type unused, struct cl_2queue *queue)
1148{
1149        struct cl_page *clp;
1150        struct cl_page *temp;
1151        int result = 0;
1152
1153        cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1154                int rc;
1155                rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1156                if (rc == 0)
1157                        continue;
1158                result = result ?: rc;
1159        }
1160        return result;
1161}
1162
1163static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1164                              struct page **pages, int npages, int async)
1165{
1166        struct lu_env      *env;
1167        struct echo_thread_info *info;
1168        struct cl_object        *obj = echo_obj2cl(eco);
1169        struct echo_device      *ed  = eco->eo_dev;
1170        struct cl_2queue        *queue;
1171        struct cl_io        *io;
1172        struct cl_page    *clp;
1173        struct lustre_handle    lh = { 0 };
1174        int page_size = cl_page_size(obj);
1175        int refcheck;
1176        int rc;
1177        int i;
1178
1179        LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1180        LASSERT(ed->ed_next != NULL);
1181        env = cl_env_get(&refcheck);
1182        if (IS_ERR(env))
1183                return PTR_ERR(env);
1184
1185        info    = echo_env_info(env);
1186        io      = &info->eti_io;
1187        queue   = &info->eti_queue;
1188
1189        cl_2queue_init(queue);
1190
1191        io->ci_ignore_layout = 1;
1192        rc = cl_io_init(env, io, CIT_MISC, obj);
1193        if (rc < 0)
1194                goto out;
1195        LASSERT(rc == 0);
1196
1197
1198        rc = cl_echo_enqueue0(env, eco, offset,
1199                              offset + npages * PAGE_CACHE_SIZE - 1,
1200                              rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1201                              CEF_NEVER);
1202        if (rc < 0)
1203                goto error_lock;
1204
1205        for (i = 0; i < npages; i++) {
1206                LASSERT(pages[i]);
1207                clp = cl_page_find(env, obj, cl_index(obj, offset),
1208                                   pages[i], CPT_TRANSIENT);
1209                if (IS_ERR(clp)) {
1210                        rc = PTR_ERR(clp);
1211                        break;
1212                }
1213                LASSERT(clp->cp_type == CPT_TRANSIENT);
1214
1215                rc = cl_page_own(env, io, clp);
1216                if (rc) {
1217                        LASSERT(clp->cp_state == CPS_FREEING);
1218                        cl_page_put(env, clp);
1219                        break;
1220                }
1221
1222                cl_2queue_add(queue, clp);
1223
1224                /* drop the reference count for cl_page_find, so that the page
1225                 * will be freed in cl_2queue_fini. */
1226                cl_page_put(env, clp);
1227                cl_page_clip(env, clp, 0, page_size);
1228
1229                offset += page_size;
1230        }
1231
1232        if (rc == 0) {
1233                enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1234
1235                async = async && (typ == CRT_WRITE);
1236                if (async)
1237                        rc = cl_echo_async_brw(env, io, typ, queue);
1238                else
1239                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
1240                CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1241                       async ? "async" : "sync", rc);
1242        }
1243
1244        cl_echo_cancel0(env, ed, lh.cookie);
1245error_lock:
1246        cl_2queue_discard(env, io, queue);
1247        cl_2queue_disown(env, io, queue);
1248        cl_2queue_fini(env, queue);
1249        cl_io_fini(env, io);
1250out:
1251        cl_env_put(env, &refcheck);
1252        return rc;
1253}
1254/** @} echo_exports */
1255
1256
1257static u64 last_object_id;
1258
1259static int
1260echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1261{
1262        struct lov_stripe_md *ulsm = _ulsm;
1263        int nob, i;
1264
1265        nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1266        if (nob > ulsm_nob)
1267                return -EINVAL;
1268
1269        if (copy_to_user (ulsm, lsm, sizeof(*ulsm)))
1270                return -EFAULT;
1271
1272        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1273                if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1274                                      sizeof(lsm->lsm_oinfo[0])))
1275                        return -EFAULT;
1276        }
1277        return 0;
1278}
1279
1280static int
1281echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1282                 void *ulsm, int ulsm_nob)
1283{
1284        struct echo_client_obd *ec = ed->ed_ec;
1285        int                  i;
1286
1287        if (ulsm_nob < sizeof (*lsm))
1288                return -EINVAL;
1289
1290        if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
1291                return -EFAULT;
1292
1293        if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1294            lsm->lsm_magic != LOV_MAGIC ||
1295            (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1296            ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1297                return -EINVAL;
1298
1299
1300        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1301                if (copy_from_user(lsm->lsm_oinfo[i],
1302                                       ((struct lov_stripe_md *)ulsm)-> \
1303                                       lsm_oinfo[i],
1304                                       sizeof(lsm->lsm_oinfo[0])))
1305                        return -EFAULT;
1306        }
1307        return 0;
1308}
1309
1310static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1311                              int on_target, struct obdo *oa, void *ulsm,
1312                              int ulsm_nob, struct obd_trans_info *oti)
1313{
1314        struct echo_object     *eco;
1315        struct echo_client_obd *ec = ed->ed_ec;
1316        struct lov_stripe_md   *lsm = NULL;
1317        int                  rc;
1318        int                  created = 0;
1319
1320        if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1321            (on_target ||                      /* set_stripe */
1322             ec->ec_nstripes != 0)) {      /* LOV */
1323                CERROR ("No valid oid\n");
1324                return -EINVAL;
1325        }
1326
1327        rc = echo_alloc_memmd(ed, &lsm);
1328        if (rc < 0) {
1329                CERROR("Cannot allocate md: rc = %d\n", rc);
1330                goto failed;
1331        }
1332
1333        if (ulsm != NULL) {
1334                int i, idx;
1335
1336                rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
1337                if (rc != 0)
1338                        goto failed;
1339
1340                if (lsm->lsm_stripe_count == 0)
1341                        lsm->lsm_stripe_count = ec->ec_nstripes;
1342
1343                if (lsm->lsm_stripe_size == 0)
1344                        lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1345
1346                idx = cfs_rand();
1347
1348                /* setup stripes: indices + default ids if required */
1349                for (i = 0; i < lsm->lsm_stripe_count; i++) {
1350                        if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1351                                lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1352
1353                        lsm->lsm_oinfo[i]->loi_ost_idx =
1354                                (idx + i) % ec->ec_nstripes;
1355                }
1356        }
1357
1358        /* setup object ID here for !on_target and LOV hint */
1359        if (oa->o_valid & OBD_MD_FLID) {
1360                LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1361                lsm->lsm_oi = oa->o_oi;
1362        }
1363
1364        if (ostid_id(&lsm->lsm_oi) == 0)
1365                ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1366
1367        rc = 0;
1368        if (on_target) {
1369                /* Only echo objects are allowed to be created */
1370                LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1371                        (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1372                rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1373                if (rc != 0) {
1374                        CERROR("Cannot create objects: rc = %d\n", rc);
1375                        goto failed;
1376                }
1377                created = 1;
1378        }
1379
1380        /* See what object ID we were given */
1381        oa->o_oi = lsm->lsm_oi;
1382        oa->o_valid |= OBD_MD_FLID;
1383
1384        eco = cl_echo_object_find(ed, &lsm);
1385        if (IS_ERR(eco)) {
1386                rc = PTR_ERR(eco);
1387                goto failed;
1388        }
1389        cl_echo_object_put(eco);
1390
1391        CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1392
1393 failed:
1394        if (created && rc)
1395                obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
1396        if (lsm)
1397                echo_free_memmd(ed, &lsm);
1398        if (rc)
1399                CERROR("create object failed with: rc = %d\n", rc);
1400        return rc;
1401}
1402
1403static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1404                           struct obdo *oa)
1405{
1406        struct lov_stripe_md   *lsm = NULL;
1407        struct echo_object     *eco;
1408        int                  rc;
1409
1410        if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1411                /* disallow use of object id 0 */
1412                CERROR ("No valid oid\n");
1413                return -EINVAL;
1414        }
1415
1416        rc = echo_alloc_memmd(ed, &lsm);
1417        if (rc < 0)
1418                return rc;
1419
1420        lsm->lsm_oi = oa->o_oi;
1421        if (!(oa->o_valid & OBD_MD_FLGROUP))
1422                ostid_set_seq_echo(&lsm->lsm_oi);
1423
1424        rc = 0;
1425        eco = cl_echo_object_find(ed, &lsm);
1426        if (!IS_ERR(eco))
1427                *ecop = eco;
1428        else
1429                rc = PTR_ERR(eco);
1430        if (lsm)
1431                echo_free_memmd(ed, &lsm);
1432        return rc;
1433}
1434
1435static void echo_put_object(struct echo_object *eco)
1436{
1437        if (cl_echo_object_put(eco))
1438                CERROR("echo client: drop an object failed");
1439}
1440
1441static void
1442echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1443{
1444        unsigned long stripe_count;
1445        unsigned long stripe_size;
1446        unsigned long width;
1447        unsigned long woffset;
1448        int        stripe_index;
1449        u64       offset;
1450
1451        if (lsm->lsm_stripe_count <= 1)
1452                return;
1453
1454        offset       = *offp;
1455        stripe_size  = lsm->lsm_stripe_size;
1456        stripe_count = lsm->lsm_stripe_count;
1457
1458        /* width = # bytes in all stripes */
1459        width = stripe_size * stripe_count;
1460
1461        /* woffset = offset within a width; offset = whole number of widths */
1462        woffset = do_div (offset, width);
1463
1464        stripe_index = woffset / stripe_size;
1465
1466        *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1467        *offp = offset * stripe_size + woffset % stripe_size;
1468}
1469
1470static void
1471echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1472                             struct page *page, int rw, u64 id,
1473                             u64 offset, u64 count)
1474{
1475        char    *addr;
1476        u64      stripe_off;
1477        u64      stripe_id;
1478        int      delta;
1479
1480        /* no partial pages on the client */
1481        LASSERT(count == PAGE_CACHE_SIZE);
1482
1483        addr = kmap(page);
1484
1485        for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1486                if (rw == OBD_BRW_WRITE) {
1487                        stripe_off = offset + delta;
1488                        stripe_id = id;
1489                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1490                } else {
1491                        stripe_off = 0xdeadbeef00c0ffeeULL;
1492                        stripe_id = 0xdeadbeef00c0ffeeULL;
1493                }
1494                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1495                                  stripe_off, stripe_id);
1496        }
1497
1498        kunmap(page);
1499}
1500
1501static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1502                                        struct page *page, u64 id,
1503                                        u64 offset, u64 count)
1504{
1505        u64     stripe_off;
1506        u64     stripe_id;
1507        char   *addr;
1508        int     delta;
1509        int     rc;
1510        int     rc2;
1511
1512        /* no partial pages on the client */
1513        LASSERT(count == PAGE_CACHE_SIZE);
1514
1515        addr = kmap(page);
1516
1517        for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1518                stripe_off = offset + delta;
1519                stripe_id = id;
1520                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
1521
1522                rc2 = block_debug_check("test_brw",
1523                                        addr + delta, OBD_ECHO_BLOCK_SIZE,
1524                                        stripe_off, stripe_id);
1525                if (rc2 != 0) {
1526                        CERROR ("Error in echo object %#llx\n", id);
1527                        rc = rc2;
1528                }
1529        }
1530
1531        kunmap(page);
1532        return rc;
1533}
1534
1535static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1536                            struct echo_object *eco, u64 offset,
1537                            u64 count, int async,
1538                            struct obd_trans_info *oti)
1539{
1540        struct lov_stripe_md   *lsm = eco->eo_lsm;
1541        u32            npages;
1542        struct brw_page *pga;
1543        struct brw_page *pgp;
1544        struct page         **pages;
1545        u64              off;
1546        int                  i;
1547        int                  rc;
1548        int                  verify;
1549        gfp_t                gfp_mask;
1550        int                  brw_flags = 0;
1551
1552        verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1553                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1554                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1555
1556        gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
1557
1558        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1559        LASSERT(lsm != NULL);
1560        LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1561
1562        if (count <= 0 ||
1563            (count & (~CFS_PAGE_MASK)) != 0)
1564                return -EINVAL;
1565
1566        /* XXX think again with misaligned I/O */
1567        npages = count >> PAGE_CACHE_SHIFT;
1568
1569        if (rw == OBD_BRW_WRITE)
1570                brw_flags = OBD_BRW_ASYNC;
1571
1572        OBD_ALLOC(pga, npages * sizeof(*pga));
1573        if (pga == NULL)
1574                return -ENOMEM;
1575
1576        OBD_ALLOC(pages, npages * sizeof(*pages));
1577        if (pages == NULL) {
1578                OBD_FREE(pga, npages * sizeof(*pga));
1579                return -ENOMEM;
1580        }
1581
1582        for (i = 0, pgp = pga, off = offset;
1583             i < npages;
1584             i++, pgp++, off += PAGE_CACHE_SIZE) {
1585
1586                LASSERT (pgp->pg == NULL);      /* for cleanup */
1587
1588                rc = -ENOMEM;
1589                OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
1590                if (pgp->pg == NULL)
1591                        goto out;
1592
1593                pages[i] = pgp->pg;
1594                pgp->count = PAGE_CACHE_SIZE;
1595                pgp->off = off;
1596                pgp->flag = brw_flags;
1597
1598                if (verify)
1599                        echo_client_page_debug_setup(lsm, pgp->pg, rw,
1600                                                     ostid_id(&oa->o_oi), off,
1601                                                     pgp->count);
1602        }
1603
1604        /* brw mode can only be used at client */
1605        LASSERT(ed->ed_next != NULL);
1606        rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1607
1608 out:
1609        if (rc != 0 || rw != OBD_BRW_READ)
1610                verify = 0;
1611
1612        for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1613                if (pgp->pg == NULL)
1614                        continue;
1615
1616                if (verify) {
1617                        int vrc;
1618                        vrc = echo_client_page_debug_check(lsm, pgp->pg,
1619                                                           ostid_id(&oa->o_oi),
1620                                                           pgp->off, pgp->count);
1621                        if (vrc != 0 && rc == 0)
1622                                rc = vrc;
1623                }
1624                OBD_PAGE_FREE(pgp->pg);
1625        }
1626        OBD_FREE(pga, npages * sizeof(*pga));
1627        OBD_FREE(pages, npages * sizeof(*pages));
1628        return rc;
1629}
1630
1631static int echo_client_prep_commit(const struct lu_env *env,
1632                                   struct obd_export *exp, int rw,
1633                                   struct obdo *oa, struct echo_object *eco,
1634                                   u64 offset, u64 count,
1635                                   u64 batch, struct obd_trans_info *oti,
1636                                   int async)
1637{
1638        struct lov_stripe_md *lsm = eco->eo_lsm;
1639        struct obd_ioobj ioo;
1640        struct niobuf_local *lnb;
1641        struct niobuf_remote *rnb;
1642        u64 off;
1643        u64 npages, tot_pages;
1644        int i, ret = 0, brw_flags = 0;
1645
1646        if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1647            (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1648                return -EINVAL;
1649
1650        npages = batch >> PAGE_CACHE_SHIFT;
1651        tot_pages = count >> PAGE_CACHE_SHIFT;
1652
1653        OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
1654        OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
1655
1656        if (lnb == NULL || rnb == NULL) {
1657                ret = -ENOMEM;
1658                goto out;
1659        }
1660
1661        if (rw == OBD_BRW_WRITE && async)
1662                brw_flags |= OBD_BRW_ASYNC;
1663
1664        obdo_to_ioobj(oa, &ioo);
1665
1666        off = offset;
1667
1668        for (; tot_pages; tot_pages -= npages) {
1669                int lpages;
1670
1671                if (tot_pages < npages)
1672                        npages = tot_pages;
1673
1674                for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1675                        rnb[i].offset = off;
1676                        rnb[i].len = PAGE_CACHE_SIZE;
1677                        rnb[i].flags = brw_flags;
1678                }
1679
1680                ioo.ioo_bufcnt = npages;
1681                oti->oti_transno = 0;
1682
1683                lpages = npages;
1684                ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1685                                 lnb, oti, NULL);
1686                if (ret != 0)
1687                        goto out;
1688                LASSERT(lpages == npages);
1689
1690                for (i = 0; i < lpages; i++) {
1691                        struct page *page = lnb[i].page;
1692
1693                        /* read past eof? */
1694                        if (page == NULL && lnb[i].rc == 0)
1695                                continue;
1696
1697                        if (async)
1698                                lnb[i].flags |= OBD_BRW_ASYNC;
1699
1700                        if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1701                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1702                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1703                                continue;
1704
1705                        if (rw == OBD_BRW_WRITE)
1706                                echo_client_page_debug_setup(lsm, page, rw,
1707                                                            ostid_id(&oa->o_oi),
1708                                                             rnb[i].offset,
1709                                                             rnb[i].len);
1710                        else
1711                                echo_client_page_debug_check(lsm, page,
1712                                                            ostid_id(&oa->o_oi),
1713                                                             rnb[i].offset,
1714                                                             rnb[i].len);
1715                }
1716
1717                ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1718                                   rnb, npages, lnb, oti, ret);
1719                if (ret != 0)
1720                        goto out;
1721
1722                /* Reset oti otherwise it would confuse ldiskfs. */
1723                memset(oti, 0, sizeof(*oti));
1724
1725                /* Reuse env context. */
1726                lu_context_exit((struct lu_context *)&env->le_ctx);
1727                lu_context_enter((struct lu_context *)&env->le_ctx);
1728        }
1729
1730out:
1731        if (lnb)
1732                OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
1733        if (rnb)
1734                OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
1735        return ret;
1736}
1737
1738static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1739                                 struct obd_export *exp,
1740                                 struct obd_ioctl_data *data,
1741                                 struct obd_trans_info *dummy_oti)
1742{
1743        struct obd_device *obd = class_exp2obd(exp);
1744        struct echo_device *ed = obd2echo_dev(obd);
1745        struct echo_client_obd *ec = ed->ed_ec;
1746        struct obdo *oa = &data->ioc_obdo1;
1747        struct echo_object *eco;
1748        int rc;
1749        int async = 1;
1750        long test_mode;
1751
1752        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1753
1754        rc = echo_get_object(&eco, ed, oa);
1755        if (rc)
1756                return rc;
1757
1758        oa->o_valid &= ~OBD_MD_FLHANDLE;
1759
1760        /* OFD/obdfilter works only via prep/commit */
1761        test_mode = (long)data->ioc_pbuf1;
1762        if (test_mode == 1)
1763                async = 0;
1764
1765        if (ed->ed_next == NULL && test_mode != 3) {
1766                test_mode = 3;
1767                data->ioc_plen1 = data->ioc_count;
1768        }
1769
1770        /* Truncate batch size to maximum */
1771        if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1772                data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1773
1774        switch (test_mode) {
1775        case 1:
1776                /* fall through */
1777        case 2:
1778                rc = echo_client_kbrw(ed, rw, oa,
1779                                      eco, data->ioc_offset,
1780                                      data->ioc_count, async, dummy_oti);
1781                break;
1782        case 3:
1783                rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1784                                             eco, data->ioc_offset,
1785                                             data->ioc_count, data->ioc_plen1,
1786                                             dummy_oti, async);
1787                break;
1788        default:
1789                rc = -EINVAL;
1790        }
1791        echo_put_object(eco);
1792        return rc;
1793}
1794
1795static int
1796echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1797                    int mode, u64 offset, u64 nob)
1798{
1799        struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1800        struct lustre_handle   *ulh = &oa->o_handle;
1801        struct echo_object     *eco;
1802        u64              end;
1803        int                  rc;
1804
1805        if (ed->ed_next == NULL)
1806                return -EOPNOTSUPP;
1807
1808        if (!(mode == LCK_PR || mode == LCK_PW))
1809                return -EINVAL;
1810
1811        if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1812            (nob & (~CFS_PAGE_MASK)) != 0)
1813                return -EINVAL;
1814
1815        rc = echo_get_object (&eco, ed, oa);
1816        if (rc != 0)
1817                return rc;
1818
1819        end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1820        rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1821        if (rc == 0) {
1822                oa->o_valid |= OBD_MD_FLHANDLE;
1823                CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1824        }
1825        echo_put_object(eco);
1826        return rc;
1827}
1828
1829static int
1830echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1831{
1832        struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1833        __u64          cookie = oa->o_handle.cookie;
1834
1835        if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1836                return -EINVAL;
1837
1838        CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1839        return cl_echo_cancel(ed, cookie);
1840}
1841
1842static int
1843echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1844                      void *karg, void *uarg)
1845{
1846        struct obd_device      *obd = exp->exp_obd;
1847        struct echo_device     *ed = obd2echo_dev(obd);
1848        struct echo_client_obd *ec = ed->ed_ec;
1849        struct echo_object     *eco;
1850        struct obd_ioctl_data  *data = karg;
1851        struct obd_trans_info   dummy_oti;
1852        struct lu_env     *env;
1853        struct oti_req_ack_lock *ack_lock;
1854        struct obdo         *oa;
1855        struct lu_fid      fid;
1856        int                  rw = OBD_BRW_READ;
1857        int                  rc = 0;
1858        int                  i;
1859
1860        memset(&dummy_oti, 0, sizeof(dummy_oti));
1861
1862        oa = &data->ioc_obdo1;
1863        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1864                oa->o_valid |= OBD_MD_FLGROUP;
1865                ostid_set_seq_echo(&oa->o_oi);
1866        }
1867
1868        /* This FID is unpacked just for validation at this point */
1869        rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1870        if (rc < 0)
1871                return rc;
1872
1873        OBD_ALLOC_PTR(env);
1874        if (env == NULL)
1875                return -ENOMEM;
1876
1877        rc = lu_env_init(env, LCT_DT_THREAD);
1878        if (rc) {
1879                rc = -ENOMEM;
1880                goto out;
1881        }
1882
1883        switch (cmd) {
1884        case OBD_IOC_CREATE:                /* may create echo object */
1885                if (!capable(CFS_CAP_SYS_ADMIN)) {
1886                        rc = -EPERM;
1887                        goto out;
1888                }
1889
1890                rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1891                                        data->ioc_plen1, &dummy_oti);
1892                goto out;
1893
1894        case OBD_IOC_DESTROY:
1895                if (!capable(CFS_CAP_SYS_ADMIN)) {
1896                        rc = -EPERM;
1897                        goto out;
1898                }
1899
1900                rc = echo_get_object(&eco, ed, oa);
1901                if (rc == 0) {
1902                        rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1903                                         &dummy_oti, NULL, NULL);
1904                        if (rc == 0)
1905                                eco->eo_deleted = 1;
1906                        echo_put_object(eco);
1907                }
1908                goto out;
1909
1910        case OBD_IOC_GETATTR:
1911                rc = echo_get_object(&eco, ed, oa);
1912                if (rc == 0) {
1913                        struct obd_info oinfo = { { { 0 } } };
1914                        oinfo.oi_md = eco->eo_lsm;
1915                        oinfo.oi_oa = oa;
1916                        rc = obd_getattr(env, ec->ec_exp, &oinfo);
1917                        echo_put_object(eco);
1918                }
1919                goto out;
1920
1921        case OBD_IOC_SETATTR:
1922                if (!capable(CFS_CAP_SYS_ADMIN)) {
1923                        rc = -EPERM;
1924                        goto out;
1925                }
1926
1927                rc = echo_get_object(&eco, ed, oa);
1928                if (rc == 0) {
1929                        struct obd_info oinfo = { { { 0 } } };
1930                        oinfo.oi_oa = oa;
1931                        oinfo.oi_md = eco->eo_lsm;
1932
1933                        rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1934                        echo_put_object(eco);
1935                }
1936                goto out;
1937
1938        case OBD_IOC_BRW_WRITE:
1939                if (!capable(CFS_CAP_SYS_ADMIN)) {
1940                        rc = -EPERM;
1941                        goto out;
1942                }
1943
1944                rw = OBD_BRW_WRITE;
1945                /* fall through */
1946        case OBD_IOC_BRW_READ:
1947                rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1948                goto out;
1949
1950        case ECHO_IOC_GET_STRIPE:
1951                rc = echo_get_object(&eco, ed, oa);
1952                if (rc == 0) {
1953                        rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1954                                              data->ioc_plen1);
1955                        echo_put_object(eco);
1956                }
1957                goto out;
1958
1959        case ECHO_IOC_SET_STRIPE:
1960                if (!capable(CFS_CAP_SYS_ADMIN)) {
1961                        rc = -EPERM;
1962                        goto out;
1963                }
1964
1965                if (data->ioc_pbuf1 == NULL) {  /* unset */
1966                        rc = echo_get_object(&eco, ed, oa);
1967                        if (rc == 0) {
1968                                eco->eo_deleted = 1;
1969                                echo_put_object(eco);
1970                        }
1971                } else {
1972                        rc = echo_create_object(env, ed, 0, oa,
1973                                                data->ioc_pbuf1,
1974                                                data->ioc_plen1, &dummy_oti);
1975                }
1976                goto out;
1977
1978        case ECHO_IOC_ENQUEUE:
1979                if (!capable(CFS_CAP_SYS_ADMIN)) {
1980                        rc = -EPERM;
1981                        goto out;
1982                }
1983
1984                rc = echo_client_enqueue(exp, oa,
1985                                         data->ioc_conn1, /* lock mode */
1986                                         data->ioc_offset,
1987                                         data->ioc_count);/*extent*/
1988                goto out;
1989
1990        case ECHO_IOC_CANCEL:
1991                rc = echo_client_cancel(exp, oa);
1992                goto out;
1993
1994        default:
1995                CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1996                rc = -ENOTTY;
1997                goto out;
1998        }
1999
2000out:
2001        lu_env_fini(env);
2002        OBD_FREE_PTR(env);
2003
2004        /* XXX this should be in a helper also called by target_send_reply */
2005        for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2006             i++, ack_lock++) {
2007                if (!ack_lock->mode)
2008                        break;
2009                ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2010        }
2011
2012        return rc;
2013}
2014
2015static int echo_client_setup(const struct lu_env *env,
2016                             struct obd_device *obddev, struct lustre_cfg *lcfg)
2017{
2018        struct echo_client_obd *ec = &obddev->u.echo_client;
2019        struct obd_device *tgt;
2020        struct obd_uuid echo_uuid = { "ECHO_UUID" };
2021        struct obd_connect_data *ocd = NULL;
2022        int rc;
2023
2024        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2025                CERROR("requires a TARGET OBD name\n");
2026                return -EINVAL;
2027        }
2028
2029        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2030        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2031                CERROR("device not attached or not set up (%s)\n",
2032                       lustre_cfg_string(lcfg, 1));
2033                return -EINVAL;
2034        }
2035
2036        spin_lock_init(&ec->ec_lock);
2037        INIT_LIST_HEAD (&ec->ec_objects);
2038        INIT_LIST_HEAD (&ec->ec_locks);
2039        ec->ec_unique = 0;
2040        ec->ec_nstripes = 0;
2041
2042        OBD_ALLOC(ocd, sizeof(*ocd));
2043        if (ocd == NULL) {
2044                CERROR("Can't alloc ocd connecting to %s\n",
2045                       lustre_cfg_string(lcfg, 1));
2046                return -ENOMEM;
2047        }
2048
2049        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2050                                 OBD_CONNECT_BRW_SIZE |
2051                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2052                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2053                                 OBD_CONNECT_FID;
2054        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2055        ocd->ocd_version = LUSTRE_VERSION_CODE;
2056        ocd->ocd_group = FID_SEQ_ECHO;
2057
2058        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2059        if (rc == 0) {
2060                /* Turn off pinger because it connects to tgt obd directly. */
2061                spin_lock(&tgt->obd_dev_lock);
2062                list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2063                spin_unlock(&tgt->obd_dev_lock);
2064        }
2065
2066        OBD_FREE(ocd, sizeof(*ocd));
2067
2068        if (rc != 0) {
2069                CERROR("fail to connect to device %s\n",
2070                       lustre_cfg_string(lcfg, 1));
2071                return rc;
2072        }
2073
2074        return rc;
2075}
2076
2077static int echo_client_cleanup(struct obd_device *obddev)
2078{
2079        struct echo_client_obd *ec = &obddev->u.echo_client;
2080        int rc;
2081
2082        if (!list_empty(&obddev->obd_exports)) {
2083                CERROR("still has clients!\n");
2084                return -EBUSY;
2085        }
2086
2087        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2088        rc = obd_disconnect(ec->ec_exp);
2089        if (rc != 0)
2090                CERROR("fail to disconnect device: %d\n", rc);
2091
2092        return rc;
2093}
2094
2095static int echo_client_connect(const struct lu_env *env,
2096                               struct obd_export **exp,
2097                               struct obd_device *src, struct obd_uuid *cluuid,
2098                               struct obd_connect_data *data, void *localdata)
2099{
2100        int             rc;
2101        struct lustre_handle conn = { 0 };
2102
2103        rc = class_connect(&conn, src, cluuid);
2104        if (rc == 0) {
2105                *exp = class_conn2export(&conn);
2106        }
2107
2108        return rc;
2109}
2110
2111static int echo_client_disconnect(struct obd_export *exp)
2112{
2113        int                  rc;
2114
2115        if (exp == NULL) {
2116                rc = -EINVAL;
2117                goto out;
2118        }
2119
2120        rc = class_disconnect(exp);
2121        goto out;
2122 out:
2123        return rc;
2124}
2125
2126static struct obd_ops echo_client_obd_ops = {
2127        .o_owner       = THIS_MODULE,
2128        .o_iocontrol   = echo_client_iocontrol,
2129        .o_connect     = echo_client_connect,
2130        .o_disconnect  = echo_client_disconnect
2131};
2132
2133int echo_client_init(void)
2134{
2135        struct lprocfs_static_vars lvars = { NULL };
2136        int rc;
2137
2138        lprocfs_echo_init_vars(&lvars);
2139
2140        rc = lu_kmem_init(echo_caches);
2141        if (rc == 0) {
2142                rc = class_register_type(&echo_client_obd_ops, NULL,
2143                                         lvars.module_vars,
2144                                         LUSTRE_ECHO_CLIENT_NAME,
2145                                         &echo_device_type);
2146                if (rc)
2147                        lu_kmem_fini(echo_caches);
2148        }
2149        return rc;
2150}
2151
2152void echo_client_exit(void)
2153{
2154        class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2155        lu_kmem_fini(echo_caches);
2156}
2157
2158static int __init obdecho_init(void)
2159{
2160        struct lprocfs_static_vars lvars;
2161        int rc;
2162
2163        LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2164
2165        LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2166
2167        lprocfs_echo_init_vars(&lvars);
2168
2169
2170        rc = echo_client_init();
2171
2172        return rc;
2173}
2174
2175static void /*__exit*/ obdecho_exit(void)
2176{
2177        echo_client_exit();
2178
2179}
2180
2181MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2182MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2183MODULE_LICENSE("GPL");
2184MODULE_VERSION(LUSTRE_VERSION_STRING);
2185
2186module_init(obdecho_init);
2187module_exit(obdecho_exit);
2188
2189/** @} echo_client */
2190