linux/drivers/staging/lustre/lustre/obdecho/echo_client.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_ECHO
  38#include <linux/libcfs/libcfs.h>
  39
  40#include <obd.h>
  41#include <obd_support.h>
  42#include <obd_class.h>
  43#include <lustre_debug.h>
  44#include <lprocfs_status.h>
  45#include <cl_object.h>
  46#include <lustre_fid.h>
  47#include <lustre_acl.h>
  48#include <lustre_net.h>
  49#include <obd_lov.h>
  50
  51#include "echo_internal.h"
  52
  53/** \defgroup echo_client Echo Client
  54 * @{
  55 */
  56
  57struct echo_device {
  58        struct cl_device        ed_cl;
  59        struct echo_client_obd *ed_ec;
  60
  61        struct cl_site    ed_site_myself;
  62        struct cl_site   *ed_site;
  63        struct lu_device       *ed_next;
  64        int                  ed_next_islov;
  65        int                  ed_next_ismd;
  66        struct lu_client_seq   *ed_cl_seq;
  67};
  68
  69struct echo_object {
  70        struct cl_object        eo_cl;
  71        struct cl_object_header eo_hdr;
  72
  73        struct echo_device     *eo_dev;
  74        struct list_head              eo_obj_chain;
  75        struct lov_stripe_md   *eo_lsm;
  76        atomic_t            eo_npages;
  77        int                  eo_deleted;
  78};
  79
  80struct echo_object_conf {
  81        struct cl_object_conf  eoc_cl;
  82        struct lov_stripe_md **eoc_md;
  83};
  84
  85struct echo_page {
  86        struct cl_page_slice   ep_cl;
  87        struct mutex            ep_lock;
  88        struct page         *ep_vmpage;
  89};
  90
  91struct echo_lock {
  92        struct cl_lock_slice   el_cl;
  93        struct list_head             el_chain;
  94        struct echo_object    *el_object;
  95        __u64             el_cookie;
  96        atomic_t           el_refcount;
  97};
  98
  99struct echo_io {
 100        struct cl_io_slice     ei_cl;
 101};
 102
 103#if 0
 104struct echo_req {
 105        struct cl_req_slice er_cl;
 106};
 107#endif
 108
 109static int echo_client_setup(const struct lu_env *env,
 110                             struct obd_device *obddev,
 111                             struct lustre_cfg *lcfg);
 112static int echo_client_cleanup(struct obd_device *obddev);
 113
 114
 115/** \defgroup echo_helpers Helper functions
 116 * @{
 117 */
 118static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 119{
 120        return container_of0(dev, struct echo_device, ed_cl);
 121}
 122
 123static inline struct cl_device *echo_dev2cl(struct echo_device *d)
 124{
 125        return &d->ed_cl;
 126}
 127
 128static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
 129{
 130        return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
 131}
 132
 133static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
 134{
 135        return &eco->eo_cl;
 136}
 137
 138static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
 139{
 140        return container_of(o, struct echo_object, eo_cl);
 141}
 142
 143static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
 144{
 145        return container_of(s, struct echo_page, ep_cl);
 146}
 147
 148static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
 149{
 150        return container_of(s, struct echo_lock, el_cl);
 151}
 152
 153static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
 154{
 155        return ecl->el_cl.cls_lock;
 156}
 157
 158static struct lu_context_key echo_thread_key;
 159static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
 160{
 161        struct echo_thread_info *info;
 162        info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
 163        LASSERT(info != NULL);
 164        return info;
 165}
 166
 167static inline
 168struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 169{
 170        return container_of(c, struct echo_object_conf, eoc_cl);
 171}
 172
 173/** @} echo_helpers */
 174
 175static struct echo_object *cl_echo_object_find(struct echo_device *d,
 176                                               struct lov_stripe_md **lsm);
 177static int cl_echo_object_put(struct echo_object *eco);
 178static int cl_echo_enqueue   (struct echo_object *eco, obd_off start,
 179                              obd_off end, int mode, __u64 *cookie);
 180static int cl_echo_cancel    (struct echo_device *d, __u64 cookie);
 181static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
 182                              struct page **pages, int npages, int async);
 183
 184static struct echo_thread_info *echo_env_info(const struct lu_env *env);
 185
 186struct echo_thread_info {
 187        struct echo_object_conf eti_conf;
 188        struct lustre_md        eti_md;
 189
 190        struct cl_2queue        eti_queue;
 191        struct cl_io        eti_io;
 192        struct cl_lock_descr    eti_descr;
 193        struct lu_fid      eti_fid;
 194        struct lu_fid           eti_fid2;
 195        struct md_op_spec       eti_spec;
 196        struct lov_mds_md_v3    eti_lmm;
 197        struct lov_user_md_v3   eti_lum;
 198        struct md_attr    eti_ma;
 199        struct lu_name    eti_lname;
 200        /* per-thread values, can be re-used */
 201        void                    *eti_big_lmm;
 202        int                     eti_big_lmmsize;
 203        char                eti_name[20];
 204        struct lu_buf      eti_buf;
 205        char                eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 206};
 207
 208/* No session used right now */
 209struct echo_session_info {
 210        unsigned long dummy;
 211};
 212
 213static struct kmem_cache *echo_lock_kmem;
 214static struct kmem_cache *echo_object_kmem;
 215static struct kmem_cache *echo_thread_kmem;
 216static struct kmem_cache *echo_session_kmem;
 217//static struct kmem_cache *echo_req_kmem;
 218
 219static struct lu_kmem_descr echo_caches[] = {
 220        {
 221                .ckd_cache = &echo_lock_kmem,
 222                .ckd_name  = "echo_lock_kmem",
 223                .ckd_size  = sizeof (struct echo_lock)
 224        },
 225        {
 226                .ckd_cache = &echo_object_kmem,
 227                .ckd_name  = "echo_object_kmem",
 228                .ckd_size  = sizeof (struct echo_object)
 229        },
 230        {
 231                .ckd_cache = &echo_thread_kmem,
 232                .ckd_name  = "echo_thread_kmem",
 233                .ckd_size  = sizeof (struct echo_thread_info)
 234        },
 235        {
 236                .ckd_cache = &echo_session_kmem,
 237                .ckd_name  = "echo_session_kmem",
 238                .ckd_size  = sizeof (struct echo_session_info)
 239        },
 240#if 0
 241        {
 242                .ckd_cache = &echo_req_kmem,
 243                .ckd_name  = "echo_req_kmem",
 244                .ckd_size  = sizeof (struct echo_req)
 245        },
 246#endif
 247        {
 248                .ckd_cache = NULL
 249        }
 250};
 251
 252/** \defgroup echo_page Page operations
 253 *
 254 * Echo page operations.
 255 *
 256 * @{
 257 */
 258static struct page *echo_page_vmpage(const struct lu_env *env,
 259                                    const struct cl_page_slice *slice)
 260{
 261        return cl2echo_page(slice)->ep_vmpage;
 262}
 263
 264static int echo_page_own(const struct lu_env *env,
 265                         const struct cl_page_slice *slice,
 266                         struct cl_io *io, int nonblock)
 267{
 268        struct echo_page *ep = cl2echo_page(slice);
 269
 270        if (!nonblock)
 271                mutex_lock(&ep->ep_lock);
 272        else if (!mutex_trylock(&ep->ep_lock))
 273                return -EAGAIN;
 274        return 0;
 275}
 276
 277static void echo_page_disown(const struct lu_env *env,
 278                             const struct cl_page_slice *slice,
 279                             struct cl_io *io)
 280{
 281        struct echo_page *ep = cl2echo_page(slice);
 282
 283        LASSERT(mutex_is_locked(&ep->ep_lock));
 284        mutex_unlock(&ep->ep_lock);
 285}
 286
 287static void echo_page_discard(const struct lu_env *env,
 288                              const struct cl_page_slice *slice,
 289                              struct cl_io *unused)
 290{
 291        cl_page_delete(env, slice->cpl_page);
 292}
 293
 294static int echo_page_is_vmlocked(const struct lu_env *env,
 295                                 const struct cl_page_slice *slice)
 296{
 297        if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
 298                return -EBUSY;
 299        return -ENODATA;
 300}
 301
 302static void echo_page_completion(const struct lu_env *env,
 303                                 const struct cl_page_slice *slice,
 304                                 int ioret)
 305{
 306        LASSERT(slice->cpl_page->cp_sync_io != NULL);
 307}
 308
 309static void echo_page_fini(const struct lu_env *env,
 310                           struct cl_page_slice *slice)
 311{
 312        struct echo_page *ep    = cl2echo_page(slice);
 313        struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
 314        struct page *vmpage      = ep->ep_vmpage;
 315        ENTRY;
 316
 317        atomic_dec(&eco->eo_npages);
 318        page_cache_release(vmpage);
 319        EXIT;
 320}
 321
 322static int echo_page_prep(const struct lu_env *env,
 323                          const struct cl_page_slice *slice,
 324                          struct cl_io *unused)
 325{
 326        return 0;
 327}
 328
 329static int echo_page_print(const struct lu_env *env,
 330                           const struct cl_page_slice *slice,
 331                           void *cookie, lu_printer_t printer)
 332{
 333        struct echo_page *ep = cl2echo_page(slice);
 334
 335        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
 336                   ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
 337        return 0;
 338}
 339
 340static const struct cl_page_operations echo_page_ops = {
 341        .cpo_own           = echo_page_own,
 342        .cpo_disown     = echo_page_disown,
 343        .cpo_discard       = echo_page_discard,
 344        .cpo_vmpage     = echo_page_vmpage,
 345        .cpo_fini         = echo_page_fini,
 346        .cpo_print       = echo_page_print,
 347        .cpo_is_vmlocked   = echo_page_is_vmlocked,
 348        .io = {
 349                [CRT_READ] = {
 350                        .cpo_prep       = echo_page_prep,
 351                        .cpo_completion  = echo_page_completion,
 352                },
 353                [CRT_WRITE] = {
 354                        .cpo_prep       = echo_page_prep,
 355                        .cpo_completion  = echo_page_completion,
 356                }
 357        }
 358};
 359/** @} echo_page */
 360
 361/** \defgroup echo_lock Locking
 362 *
 363 * echo lock operations
 364 *
 365 * @{
 366 */
 367static void echo_lock_fini(const struct lu_env *env,
 368                           struct cl_lock_slice *slice)
 369{
 370        struct echo_lock *ecl = cl2echo_lock(slice);
 371
 372        LASSERT(list_empty(&ecl->el_chain));
 373        OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
 374}
 375
 376static void echo_lock_delete(const struct lu_env *env,
 377                             const struct cl_lock_slice *slice)
 378{
 379        struct echo_lock *ecl      = cl2echo_lock(slice);
 380
 381        LASSERT(list_empty(&ecl->el_chain));
 382}
 383
 384static int echo_lock_fits_into(const struct lu_env *env,
 385                               const struct cl_lock_slice *slice,
 386                               const struct cl_lock_descr *need,
 387                               const struct cl_io *unused)
 388{
 389        return 1;
 390}
 391
 392static struct cl_lock_operations echo_lock_ops = {
 393        .clo_fini      = echo_lock_fini,
 394        .clo_delete    = echo_lock_delete,
 395        .clo_fits_into = echo_lock_fits_into
 396};
 397
 398/** @} echo_lock */
 399
 400/** \defgroup echo_cl_ops cl_object operations
 401 *
 402 * operations for cl_object
 403 *
 404 * @{
 405 */
 406static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 407                        struct cl_page *page, struct page *vmpage)
 408{
 409        struct echo_page *ep = cl_object_page_slice(obj, page);
 410        struct echo_object *eco = cl2echo_obj(obj);
 411        ENTRY;
 412
 413        ep->ep_vmpage = vmpage;
 414        page_cache_get(vmpage);
 415        mutex_init(&ep->ep_lock);
 416        cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
 417        atomic_inc(&eco->eo_npages);
 418        RETURN(0);
 419}
 420
 421static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
 422                        struct cl_io *io)
 423{
 424        return 0;
 425}
 426
 427static int echo_lock_init(const struct lu_env *env,
 428                          struct cl_object *obj, struct cl_lock *lock,
 429                          const struct cl_io *unused)
 430{
 431        struct echo_lock *el;
 432        ENTRY;
 433
 434        OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, __GFP_IO);
 435        if (el != NULL) {
 436                cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
 437                el->el_object = cl2echo_obj(obj);
 438                INIT_LIST_HEAD(&el->el_chain);
 439                atomic_set(&el->el_refcount, 0);
 440        }
 441        RETURN(el == NULL ? -ENOMEM : 0);
 442}
 443
 444static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
 445                         const struct cl_object_conf *conf)
 446{
 447        return 0;
 448}
 449
 450static const struct cl_object_operations echo_cl_obj_ops = {
 451        .coo_page_init = echo_page_init,
 452        .coo_lock_init = echo_lock_init,
 453        .coo_io_init   = echo_io_init,
 454        .coo_conf_set  = echo_conf_set
 455};
 456/** @} echo_cl_ops */
 457
 458/** \defgroup echo_lu_ops lu_object operations
 459 *
 460 * operations for echo lu object.
 461 *
 462 * @{
 463 */
 464static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
 465                            const struct lu_object_conf *conf)
 466{
 467        struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
 468        struct echo_client_obd *ec     = ed->ed_ec;
 469        struct echo_object *eco = cl2echo_obj(lu2cl(obj));
 470        ENTRY;
 471
 472        if (ed->ed_next) {
 473                struct lu_object  *below;
 474                struct lu_device  *under;
 475
 476                under = ed->ed_next;
 477                below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
 478                                                        under);
 479                if (below == NULL)
 480                        RETURN(-ENOMEM);
 481                lu_object_add(obj, below);
 482        }
 483
 484        if (!ed->ed_next_ismd) {
 485                const struct cl_object_conf *cconf = lu2cl_conf(conf);
 486                struct echo_object_conf *econf = cl2echo_conf(cconf);
 487
 488                LASSERT(econf->eoc_md);
 489                eco->eo_lsm = *econf->eoc_md;
 490                /* clear the lsm pointer so that it won't get freed. */
 491                *econf->eoc_md = NULL;
 492        } else {
 493                eco->eo_lsm = NULL;
 494        }
 495
 496        eco->eo_dev = ed;
 497        atomic_set(&eco->eo_npages, 0);
 498        cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
 499
 500        spin_lock(&ec->ec_lock);
 501        list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
 502        spin_unlock(&ec->ec_lock);
 503
 504        RETURN(0);
 505}
 506
 507/* taken from osc_unpackmd() */
 508static int echo_alloc_memmd(struct echo_device *ed,
 509                            struct lov_stripe_md **lsmp)
 510{
 511        int lsm_size;
 512
 513        ENTRY;
 514
 515        /* If export is lov/osc then use their obd method */
 516        if (ed->ed_next != NULL)
 517                return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
 518        /* OFD has no unpackmd method, do everything here */
 519        lsm_size = lov_stripe_md_size(1);
 520
 521        LASSERT(*lsmp == NULL);
 522        OBD_ALLOC(*lsmp, lsm_size);
 523        if (*lsmp == NULL)
 524                RETURN(-ENOMEM);
 525
 526        OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 527        if ((*lsmp)->lsm_oinfo[0] == NULL) {
 528                OBD_FREE(*lsmp, lsm_size);
 529                RETURN(-ENOMEM);
 530        }
 531
 532        loi_init((*lsmp)->lsm_oinfo[0]);
 533        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 534        ostid_set_seq_echo(&(*lsmp)->lsm_oi);
 535
 536        RETURN(lsm_size);
 537}
 538
 539static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
 540{
 541        int lsm_size;
 542
 543        ENTRY;
 544
 545        /* If export is lov/osc then use their obd method */
 546        if (ed->ed_next != NULL)
 547                return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
 548        /* OFD has no unpackmd method, do everything here */
 549        lsm_size = lov_stripe_md_size(1);
 550
 551        LASSERT(*lsmp != NULL);
 552        OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 553        OBD_FREE(*lsmp, lsm_size);
 554        *lsmp = NULL;
 555        RETURN(0);
 556}
 557
 558static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 559{
 560        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
 561        struct echo_client_obd *ec = eco->eo_dev->ed_ec;
 562        ENTRY;
 563
 564        LASSERT(atomic_read(&eco->eo_npages) == 0);
 565
 566        spin_lock(&ec->ec_lock);
 567        list_del_init(&eco->eo_obj_chain);
 568        spin_unlock(&ec->ec_lock);
 569
 570        lu_object_fini(obj);
 571        lu_object_header_fini(obj->lo_header);
 572
 573        if (eco->eo_lsm)
 574                echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
 575        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
 576        EXIT;
 577}
 578
 579static int echo_object_print(const struct lu_env *env, void *cookie,
 580                            lu_printer_t p, const struct lu_object *o)
 581{
 582        struct echo_object *obj = cl2echo_obj(lu2cl(o));
 583
 584        return (*p)(env, cookie, "echoclient-object@%p", obj);
 585}
 586
 587static const struct lu_object_operations echo_lu_obj_ops = {
 588        .loo_object_init      = echo_object_init,
 589        .loo_object_delete    = NULL,
 590        .loo_object_release   = NULL,
 591        .loo_object_free      = echo_object_free,
 592        .loo_object_print     = echo_object_print,
 593        .loo_object_invariant = NULL
 594};
 595/** @} echo_lu_ops */
 596
 597/** \defgroup echo_lu_dev_ops  lu_device operations
 598 *
 599 * Operations for echo lu device.
 600 *
 601 * @{
 602 */
 603static struct lu_object *echo_object_alloc(const struct lu_env *env,
 604                                           const struct lu_object_header *hdr,
 605                                           struct lu_device *dev)
 606{
 607        struct echo_object *eco;
 608        struct lu_object *obj = NULL;
 609        ENTRY;
 610
 611        /* we're the top dev. */
 612        LASSERT(hdr == NULL);
 613        OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, __GFP_IO);
 614        if (eco != NULL) {
 615                struct cl_object_header *hdr = &eco->eo_hdr;
 616
 617                obj = &echo_obj2cl(eco)->co_lu;
 618                cl_object_header_init(hdr);
 619                lu_object_init(obj, &hdr->coh_lu, dev);
 620                lu_object_add_top(&hdr->coh_lu, obj);
 621
 622                eco->eo_cl.co_ops = &echo_cl_obj_ops;
 623                obj->lo_ops       = &echo_lu_obj_ops;
 624        }
 625        RETURN(obj);
 626}
 627
 628static struct lu_device_operations echo_device_lu_ops = {
 629        .ldo_object_alloc   = echo_object_alloc,
 630};
 631
 632/** @} echo_lu_dev_ops */
 633
 634static struct cl_device_operations echo_device_cl_ops = {
 635};
 636
 637/** \defgroup echo_init Setup and teardown
 638 *
 639 * Init and fini functions for echo client.
 640 *
 641 * @{
 642 */
 643static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 644{
 645        struct cl_site *site = &ed->ed_site_myself;
 646        int rc;
 647
 648        /* initialize site */
 649        rc = cl_site_init(site, &ed->ed_cl);
 650        if (rc) {
 651                CERROR("Cannot initilize site for echo client(%d)\n", rc);
 652                return rc;
 653        }
 654
 655        rc = lu_site_init_finish(&site->cs_lu);
 656        if (rc)
 657                return rc;
 658
 659        ed->ed_site = site;
 660        return 0;
 661}
 662
 663static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
 664{
 665        if (ed->ed_site) {
 666                if (!ed->ed_next_ismd)
 667                        cl_site_fini(ed->ed_site);
 668                ed->ed_site = NULL;
 669        }
 670}
 671
 672static void *echo_thread_key_init(const struct lu_context *ctx,
 673                          struct lu_context_key *key)
 674{
 675        struct echo_thread_info *info;
 676
 677        OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, __GFP_IO);
 678        if (info == NULL)
 679                info = ERR_PTR(-ENOMEM);
 680        return info;
 681}
 682
 683static void echo_thread_key_fini(const struct lu_context *ctx,
 684                         struct lu_context_key *key, void *data)
 685{
 686        struct echo_thread_info *info = data;
 687        OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
 688}
 689
 690static void echo_thread_key_exit(const struct lu_context *ctx,
 691                         struct lu_context_key *key, void *data)
 692{
 693}
 694
 695static struct lu_context_key echo_thread_key = {
 696        .lct_tags = LCT_CL_THREAD,
 697        .lct_init = echo_thread_key_init,
 698        .lct_fini = echo_thread_key_fini,
 699        .lct_exit = echo_thread_key_exit
 700};
 701
 702static void *echo_session_key_init(const struct lu_context *ctx,
 703                                  struct lu_context_key *key)
 704{
 705        struct echo_session_info *session;
 706
 707        OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, __GFP_IO);
 708        if (session == NULL)
 709                session = ERR_PTR(-ENOMEM);
 710        return session;
 711}
 712
 713static void echo_session_key_fini(const struct lu_context *ctx,
 714                                 struct lu_context_key *key, void *data)
 715{
 716        struct echo_session_info *session = data;
 717        OBD_SLAB_FREE_PTR(session, echo_session_kmem);
 718}
 719
 720static void echo_session_key_exit(const struct lu_context *ctx,
 721                                 struct lu_context_key *key, void *data)
 722{
 723}
 724
 725static struct lu_context_key echo_session_key = {
 726        .lct_tags = LCT_SESSION,
 727        .lct_init = echo_session_key_init,
 728        .lct_fini = echo_session_key_fini,
 729        .lct_exit = echo_session_key_exit
 730};
 731
 732LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 733
 734#define ECHO_SEQ_WIDTH 0xffffffff
 735static int echo_fid_init(struct echo_device *ed, char *obd_name,
 736                         struct seq_server_site *ss)
 737{
 738        char *prefix;
 739        int rc;
 740        ENTRY;
 741
 742        OBD_ALLOC_PTR(ed->ed_cl_seq);
 743        if (ed->ed_cl_seq == NULL)
 744                RETURN(-ENOMEM);
 745
 746        OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
 747        if (prefix == NULL)
 748                GOTO(out_free_seq, rc = -ENOMEM);
 749
 750        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
 751
 752        /* Init client side sequence-manager */
 753        rc = seq_client_init(ed->ed_cl_seq, NULL,
 754                             LUSTRE_SEQ_METADATA,
 755                             prefix, ss->ss_server_seq);
 756        ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
 757        OBD_FREE(prefix, MAX_OBD_NAME + 5);
 758        if (rc)
 759                GOTO(out_free_seq, rc);
 760
 761        RETURN(0);
 762
 763out_free_seq:
 764        OBD_FREE_PTR(ed->ed_cl_seq);
 765        ed->ed_cl_seq = NULL;
 766        RETURN(rc);
 767}
 768
 769static int echo_fid_fini(struct obd_device *obddev)
 770{
 771        struct echo_device *ed = obd2echo_dev(obddev);
 772        ENTRY;
 773
 774        if (ed->ed_cl_seq != NULL) {
 775                seq_client_fini(ed->ed_cl_seq);
 776                OBD_FREE_PTR(ed->ed_cl_seq);
 777                ed->ed_cl_seq = NULL;
 778        }
 779
 780        RETURN(0);
 781}
 782
 783static struct lu_device *echo_device_alloc(const struct lu_env *env,
 784                                           struct lu_device_type *t,
 785                                           struct lustre_cfg *cfg)
 786{
 787        struct lu_device   *next;
 788        struct echo_device *ed;
 789        struct cl_device   *cd;
 790        struct obd_device  *obd = NULL; /* to keep compiler happy */
 791        struct obd_device  *tgt;
 792        const char *tgt_type_name;
 793        int rc;
 794        int cleanup = 0;
 795        ENTRY;
 796
 797        OBD_ALLOC_PTR(ed);
 798        if (ed == NULL)
 799                GOTO(out, rc = -ENOMEM);
 800
 801        cleanup = 1;
 802        cd = &ed->ed_cl;
 803        rc = cl_device_init(cd, t);
 804        if (rc)
 805                GOTO(out, rc);
 806
 807        cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
 808        cd->cd_ops = &echo_device_cl_ops;
 809
 810        cleanup = 2;
 811        obd = class_name2obd(lustre_cfg_string(cfg, 0));
 812        LASSERT(obd != NULL);
 813        LASSERT(env != NULL);
 814
 815        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
 816        if (tgt == NULL) {
 817                CERROR("Can not find tgt device %s\n",
 818                        lustre_cfg_string(cfg, 1));
 819                GOTO(out, rc = -ENODEV);
 820        }
 821
 822        next = tgt->obd_lu_dev;
 823        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 824                ed->ed_next_ismd = 1;
 825        } else {
 826                ed->ed_next_ismd = 0;
 827                rc = echo_site_init(env, ed);
 828                if (rc)
 829                        GOTO(out, rc);
 830        }
 831        cleanup = 3;
 832
 833        rc = echo_client_setup(env, obd, cfg);
 834        if (rc)
 835                GOTO(out, rc);
 836
 837        ed->ed_ec = &obd->u.echo_client;
 838        cleanup = 4;
 839
 840        if (ed->ed_next_ismd) {
 841                /* Suppose to connect to some Metadata layer */
 842                struct lu_site *ls;
 843                struct lu_device *ld;
 844                int    found = 0;
 845
 846                if (next == NULL) {
 847                        CERROR("%s is not lu device type!\n",
 848                               lustre_cfg_string(cfg, 1));
 849                        GOTO(out, rc = -EINVAL);
 850                }
 851
 852                tgt_type_name = lustre_cfg_string(cfg, 2);
 853                if (!tgt_type_name) {
 854                        CERROR("%s no type name for echo %s setup\n",
 855                                lustre_cfg_string(cfg, 1),
 856                                tgt->obd_type->typ_name);
 857                        GOTO(out, rc = -EINVAL);
 858                }
 859
 860                ls = next->ld_site;
 861
 862                spin_lock(&ls->ls_ld_lock);
 863                list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
 864                        if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
 865                                found = 1;
 866                                break;
 867                        }
 868                }
 869                spin_unlock(&ls->ls_ld_lock);
 870
 871                if (found == 0) {
 872                        CERROR("%s is not lu device type!\n",
 873                               lustre_cfg_string(cfg, 1));
 874                        GOTO(out, rc = -EINVAL);
 875                }
 876
 877                next = ld;
 878                /* For MD echo client, it will use the site in MDS stack */
 879                ed->ed_site_myself.cs_lu = *ls;
 880                ed->ed_site = &ed->ed_site_myself;
 881                ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
 882                rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
 883                if (rc) {
 884                        CERROR("echo fid init error %d\n", rc);
 885                        GOTO(out, rc);
 886                }
 887        } else {
 888                 /* if echo client is to be stacked upon ost device, the next is
 889                  * NULL since ost is not a clio device so far */
 890                if (next != NULL && !lu_device_is_cl(next))
 891                        next = NULL;
 892
 893                tgt_type_name = tgt->obd_type->typ_name;
 894                if (next != NULL) {
 895                        LASSERT(next != NULL);
 896                        if (next->ld_site != NULL)
 897                                GOTO(out, rc = -EBUSY);
 898
 899                        next->ld_site = &ed->ed_site->cs_lu;
 900                        rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
 901                                                     next->ld_type->ldt_name,
 902                                                     NULL);
 903                        if (rc)
 904                                GOTO(out, rc);
 905
 906                        /* Tricky case, I have to determine the obd type since
 907                         * CLIO uses the different parameters to initialize
 908                         * objects for lov & osc. */
 909                        if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
 910                                ed->ed_next_islov = 1;
 911                        else
 912                                LASSERT(strcmp(tgt_type_name,
 913                                               LUSTRE_OSC_NAME) == 0);
 914                } else
 915                        LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
 916        }
 917
 918        ed->ed_next = next;
 919        RETURN(&cd->cd_lu_dev);
 920out:
 921        switch(cleanup) {
 922        case 4: {
 923                int rc2;
 924                rc2 = echo_client_cleanup(obd);
 925                if (rc2)
 926                        CERROR("Cleanup obd device %s error(%d)\n",
 927                               obd->obd_name, rc2);
 928        }
 929
 930        case 3:
 931                echo_site_fini(env, ed);
 932        case 2:
 933                cl_device_fini(&ed->ed_cl);
 934        case 1:
 935                OBD_FREE_PTR(ed);
 936        case 0:
 937        default:
 938                break;
 939        }
 940        return(ERR_PTR(rc));
 941}
 942
 943static int echo_device_init(const struct lu_env *env, struct lu_device *d,
 944                          const char *name, struct lu_device *next)
 945{
 946        LBUG();
 947        return 0;
 948}
 949
 950static struct lu_device *echo_device_fini(const struct lu_env *env,
 951                                          struct lu_device *d)
 952{
 953        struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
 954        struct lu_device *next = ed->ed_next;
 955
 956        while (next && !ed->ed_next_ismd)
 957                next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
 958        return NULL;
 959}
 960
 961static void echo_lock_release(const struct lu_env *env,
 962                              struct echo_lock *ecl,
 963                              int still_used)
 964{
 965        struct cl_lock *clk = echo_lock2cl(ecl);
 966
 967        cl_lock_get(clk);
 968        cl_unuse(env, clk);
 969        cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
 970        if (!still_used) {
 971                cl_lock_mutex_get(env, clk);
 972                cl_lock_cancel(env, clk);
 973                cl_lock_delete(env, clk);
 974                cl_lock_mutex_put(env, clk);
 975        }
 976        cl_lock_put(env, clk);
 977}
 978
 979static struct lu_device *echo_device_free(const struct lu_env *env,
 980                                          struct lu_device *d)
 981{
 982        struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
 983        struct echo_client_obd *ec   = ed->ed_ec;
 984        struct echo_object     *eco;
 985        struct lu_device       *next = ed->ed_next;
 986
 987        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
 988               ed, next);
 989
 990        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 991
 992        /* check if there are objects still alive.
 993         * It shouldn't have any object because lu_site_purge would cleanup
 994         * all of cached objects. Anyway, probably the echo device is being
 995         * parallelly accessed.
 996         */
 997        spin_lock(&ec->ec_lock);
 998        list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
 999                eco->eo_deleted = 1;
1000        spin_unlock(&ec->ec_lock);
1001
1002        /* purge again */
1003        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1004
1005        CDEBUG(D_INFO,
1006               "Waiting for the reference of echo object to be dropped\n");
1007
1008        /* Wait for the last reference to be dropped. */
1009        spin_lock(&ec->ec_lock);
1010        while (!list_empty(&ec->ec_objects)) {
1011                spin_unlock(&ec->ec_lock);
1012                CERROR("echo_client still has objects at cleanup time, "
1013                       "wait for 1 second\n");
1014                schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1015                                                   cfs_time_seconds(1));
1016                lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1017                spin_lock(&ec->ec_lock);
1018        }
1019        spin_unlock(&ec->ec_lock);
1020
1021        LASSERT(list_empty(&ec->ec_locks));
1022
1023        CDEBUG(D_INFO, "No object exists, exiting...\n");
1024
1025        echo_client_cleanup(d->ld_obd);
1026        echo_fid_fini(d->ld_obd);
1027        while (next && !ed->ed_next_ismd)
1028                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1029
1030        LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
1031        echo_site_fini(env, ed);
1032        cl_device_fini(&ed->ed_cl);
1033        OBD_FREE_PTR(ed);
1034
1035        return NULL;
1036}
1037
1038static const struct lu_device_type_operations echo_device_type_ops = {
1039        .ldto_init = echo_type_init,
1040        .ldto_fini = echo_type_fini,
1041
1042        .ldto_start = echo_type_start,
1043        .ldto_stop  = echo_type_stop,
1044
1045        .ldto_device_alloc = echo_device_alloc,
1046        .ldto_device_free  = echo_device_free,
1047        .ldto_device_init  = echo_device_init,
1048        .ldto_device_fini  = echo_device_fini
1049};
1050
1051static struct lu_device_type echo_device_type = {
1052        .ldt_tags     = LU_DEVICE_CL,
1053        .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1054        .ldt_ops      = &echo_device_type_ops,
1055        .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1056};
1057/** @} echo_init */
1058
1059/** \defgroup echo_exports Exported operations
1060 *
1061 * exporting functions to echo client
1062 *
1063 * @{
1064 */
1065
1066/* Interfaces to echo client obd device */
1067static struct echo_object *cl_echo_object_find(struct echo_device *d,
1068                                               struct lov_stripe_md **lsmp)
1069{
1070        struct lu_env *env;
1071        struct echo_thread_info *info;
1072        struct echo_object_conf *conf;
1073        struct lov_stripe_md    *lsm;
1074        struct echo_object *eco;
1075        struct cl_object   *obj;
1076        struct lu_fid *fid;
1077        int refcheck;
1078        int rc;
1079        ENTRY;
1080
1081        LASSERT(lsmp);
1082        lsm = *lsmp;
1083        LASSERT(lsm);
1084        LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
1085        LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
1086                 POSTID(&lsm->lsm_oi));
1087
1088        /* Never return an object if the obd is to be freed. */
1089        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1090                RETURN(ERR_PTR(-ENODEV));
1091
1092        env = cl_env_get(&refcheck);
1093        if (IS_ERR(env))
1094                RETURN((void *)env);
1095
1096        info = echo_env_info(env);
1097        conf = &info->eti_conf;
1098        if (d->ed_next) {
1099                if (!d->ed_next_islov) {
1100                        struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
1101                        LASSERT(oinfo != NULL);
1102                        oinfo->loi_oi = lsm->lsm_oi;
1103                        conf->eoc_cl.u.coc_oinfo = oinfo;
1104                } else {
1105                        struct lustre_md *md;
1106                        md = &info->eti_md;
1107                        memset(md, 0, sizeof *md);
1108                        md->lsm = lsm;
1109                        conf->eoc_cl.u.coc_md = md;
1110                }
1111        }
1112        conf->eoc_md = lsmp;
1113
1114        fid  = &info->eti_fid;
1115        rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
1116        if (rc != 0)
1117                GOTO(out, eco = ERR_PTR(rc));
1118
1119        /* In the function below, .hs_keycmp resolves to
1120         * lu_obj_hop_keycmp() */
1121        /* coverity[overrun-buffer-val] */
1122        obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1123        if (IS_ERR(obj))
1124                GOTO(out, eco = (void*)obj);
1125
1126        eco = cl2echo_obj(obj);
1127        if (eco->eo_deleted) {
1128                cl_object_put(env, obj);
1129                eco = ERR_PTR(-EAGAIN);
1130        }
1131
1132out:
1133        cl_env_put(env, &refcheck);
1134        RETURN(eco);
1135}
1136
1137static int cl_echo_object_put(struct echo_object *eco)
1138{
1139        struct lu_env *env;
1140        struct cl_object *obj = echo_obj2cl(eco);
1141        int refcheck;
1142        ENTRY;
1143
1144        env = cl_env_get(&refcheck);
1145        if (IS_ERR(env))
1146                RETURN(PTR_ERR(env));
1147
1148        /* an external function to kill an object? */
1149        if (eco->eo_deleted) {
1150                struct lu_object_header *loh = obj->co_lu.lo_header;
1151                LASSERT(&eco->eo_hdr == luh2coh(loh));
1152                set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1153        }
1154
1155        cl_object_put(env, obj);
1156        cl_env_put(env, &refcheck);
1157        RETURN(0);
1158}
1159
1160static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1161                            obd_off start, obd_off end, int mode,
1162                            __u64 *cookie , __u32 enqflags)
1163{
1164        struct cl_io *io;
1165        struct cl_lock *lck;
1166        struct cl_object *obj;
1167        struct cl_lock_descr *descr;
1168        struct echo_thread_info *info;
1169        int rc = -ENOMEM;
1170        ENTRY;
1171
1172        info = echo_env_info(env);
1173        io = &info->eti_io;
1174        descr = &info->eti_descr;
1175        obj = echo_obj2cl(eco);
1176
1177        descr->cld_obj   = obj;
1178        descr->cld_start = cl_index(obj, start);
1179        descr->cld_end   = cl_index(obj, end);
1180        descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1181        descr->cld_enq_flags = enqflags;
1182        io->ci_obj = obj;
1183
1184        lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1185        if (lck) {
1186                struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1187                struct echo_lock *el;
1188
1189                rc = cl_wait(env, lck);
1190                if (rc == 0) {
1191                        el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1192                        spin_lock(&ec->ec_lock);
1193                        if (list_empty(&el->el_chain)) {
1194                                list_add(&el->el_chain, &ec->ec_locks);
1195                                el->el_cookie = ++ec->ec_unique;
1196                        }
1197                        atomic_inc(&el->el_refcount);
1198                        *cookie = el->el_cookie;
1199                        spin_unlock(&ec->ec_lock);
1200                } else {
1201                        cl_lock_release(env, lck, "ec enqueue", current);
1202                }
1203        }
1204        RETURN(rc);
1205}
1206
1207static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
1208                           int mode, __u64 *cookie)
1209{
1210        struct echo_thread_info *info;
1211        struct lu_env *env;
1212        struct cl_io *io;
1213        int refcheck;
1214        int result;
1215        ENTRY;
1216
1217        env = cl_env_get(&refcheck);
1218        if (IS_ERR(env))
1219                RETURN(PTR_ERR(env));
1220
1221        info = echo_env_info(env);
1222        io = &info->eti_io;
1223
1224        io->ci_ignore_layout = 1;
1225        result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1226        if (result < 0)
1227                GOTO(out, result);
1228        LASSERT(result == 0);
1229
1230        result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1231        cl_io_fini(env, io);
1232
1233        EXIT;
1234out:
1235        cl_env_put(env, &refcheck);
1236        return result;
1237}
1238
1239static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1240                           __u64 cookie)
1241{
1242        struct echo_client_obd *ec = ed->ed_ec;
1243        struct echo_lock       *ecl = NULL;
1244        struct list_head             *el;
1245        int found = 0, still_used = 0;
1246        ENTRY;
1247
1248        LASSERT(ec != NULL);
1249        spin_lock(&ec->ec_lock);
1250        list_for_each (el, &ec->ec_locks) {
1251                ecl = list_entry (el, struct echo_lock, el_chain);
1252                CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1253                found = (ecl->el_cookie == cookie);
1254                if (found) {
1255                        if (atomic_dec_and_test(&ecl->el_refcount))
1256                                list_del_init(&ecl->el_chain);
1257                        else
1258                                still_used = 1;
1259                        break;
1260                }
1261        }
1262        spin_unlock(&ec->ec_lock);
1263
1264        if (!found)
1265                RETURN(-ENOENT);
1266
1267        echo_lock_release(env, ecl, still_used);
1268        RETURN(0);
1269}
1270
1271static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1272{
1273        struct lu_env *env;
1274        int refcheck;
1275        int rc;
1276        ENTRY;
1277
1278        env = cl_env_get(&refcheck);
1279        if (IS_ERR(env))
1280                RETURN(PTR_ERR(env));
1281
1282        rc = cl_echo_cancel0(env, ed, cookie);
1283
1284        cl_env_put(env, &refcheck);
1285        RETURN(rc);
1286}
1287
1288static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1289                             enum cl_req_type unused, struct cl_2queue *queue)
1290{
1291        struct cl_page *clp;
1292        struct cl_page *temp;
1293        int result = 0;
1294        ENTRY;
1295
1296        cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1297                int rc;
1298                rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1299                if (rc == 0)
1300                        continue;
1301                result = result ?: rc;
1302        }
1303        RETURN(result);
1304}
1305
1306static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
1307                              struct page **pages, int npages, int async)
1308{
1309        struct lu_env      *env;
1310        struct echo_thread_info *info;
1311        struct cl_object        *obj = echo_obj2cl(eco);
1312        struct echo_device      *ed  = eco->eo_dev;
1313        struct cl_2queue        *queue;
1314        struct cl_io        *io;
1315        struct cl_page    *clp;
1316        struct lustre_handle    lh = { 0 };
1317        int page_size = cl_page_size(obj);
1318        int refcheck;
1319        int rc;
1320        int i;
1321        ENTRY;
1322
1323        LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1324        LASSERT(ed->ed_next != NULL);
1325        env = cl_env_get(&refcheck);
1326        if (IS_ERR(env))
1327                RETURN(PTR_ERR(env));
1328
1329        info    = echo_env_info(env);
1330        io      = &info->eti_io;
1331        queue   = &info->eti_queue;
1332
1333        cl_2queue_init(queue);
1334
1335        io->ci_ignore_layout = 1;
1336        rc = cl_io_init(env, io, CIT_MISC, obj);
1337        if (rc < 0)
1338                GOTO(out, rc);
1339        LASSERT(rc == 0);
1340
1341
1342        rc = cl_echo_enqueue0(env, eco, offset,
1343                              offset + npages * PAGE_CACHE_SIZE - 1,
1344                              rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1345                              CEF_NEVER);
1346        if (rc < 0)
1347                GOTO(error_lock, rc);
1348
1349        for (i = 0; i < npages; i++) {
1350                LASSERT(pages[i]);
1351                clp = cl_page_find(env, obj, cl_index(obj, offset),
1352                                   pages[i], CPT_TRANSIENT);
1353                if (IS_ERR(clp)) {
1354                        rc = PTR_ERR(clp);
1355                        break;
1356                }
1357                LASSERT(clp->cp_type == CPT_TRANSIENT);
1358
1359                rc = cl_page_own(env, io, clp);
1360                if (rc) {
1361                        LASSERT(clp->cp_state == CPS_FREEING);
1362                        cl_page_put(env, clp);
1363                        break;
1364                }
1365
1366                cl_2queue_add(queue, clp);
1367
1368                /* drop the reference count for cl_page_find, so that the page
1369                 * will be freed in cl_2queue_fini. */
1370                cl_page_put(env, clp);
1371                cl_page_clip(env, clp, 0, page_size);
1372
1373                offset += page_size;
1374        }
1375
1376        if (rc == 0) {
1377                enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1378
1379                async = async && (typ == CRT_WRITE);
1380                if (async)
1381                        rc = cl_echo_async_brw(env, io, typ, queue);
1382                else
1383                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
1384                CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1385                       async ? "async" : "sync", rc);
1386        }
1387
1388        cl_echo_cancel0(env, ed, lh.cookie);
1389        EXIT;
1390error_lock:
1391        cl_2queue_discard(env, io, queue);
1392        cl_2queue_disown(env, io, queue);
1393        cl_2queue_fini(env, queue);
1394        cl_io_fini(env, io);
1395out:
1396        cl_env_put(env, &refcheck);
1397        return rc;
1398}
1399/** @} echo_exports */
1400
1401
1402static obd_id last_object_id;
1403
1404static int
1405echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1406{
1407        struct lov_stripe_md *ulsm = _ulsm;
1408        int nob, i;
1409
1410        nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1411        if (nob > ulsm_nob)
1412                return (-EINVAL);
1413
1414        if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
1415                return (-EFAULT);
1416
1417        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1418                if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1419                                      sizeof(lsm->lsm_oinfo[0])))
1420                        return (-EFAULT);
1421        }
1422        return 0;
1423}
1424
1425static int
1426echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1427                 void *ulsm, int ulsm_nob)
1428{
1429        struct echo_client_obd *ec = ed->ed_ec;
1430        int                  i;
1431
1432        if (ulsm_nob < sizeof (*lsm))
1433                return (-EINVAL);
1434
1435        if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
1436                return (-EFAULT);
1437
1438        if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1439            lsm->lsm_magic != LOV_MAGIC ||
1440            (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1441            ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1442                return (-EINVAL);
1443
1444
1445        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1446                if (copy_from_user(lsm->lsm_oinfo[i],
1447                                       ((struct lov_stripe_md *)ulsm)-> \
1448                                       lsm_oinfo[i],
1449                                       sizeof(lsm->lsm_oinfo[0])))
1450                        return (-EFAULT);
1451        }
1452        return (0);
1453}
1454
1455static inline void echo_md_build_name(struct lu_name *lname, char *name,
1456                                      __u64 id)
1457{
1458        sprintf(name, LPU64, id);
1459        lname->ln_name = name;
1460        lname->ln_namelen = strlen(name);
1461}
1462
1463/* similar to mdt_attr_get_complex */
1464static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1465                            struct md_attr *ma)
1466{
1467        struct echo_thread_info *info = echo_env_info(env);
1468        int                      rc;
1469
1470        ENTRY;
1471
1472        LASSERT(ma->ma_lmm_size > 0);
1473
1474        rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1475        if (rc < 0)
1476                RETURN(rc);
1477
1478        /* big_lmm may need to be grown */
1479        if (info->eti_big_lmmsize < rc) {
1480                int size = size_roundup_power2(rc);
1481
1482                if (info->eti_big_lmmsize > 0) {
1483                        /* free old buffer */
1484                        LASSERT(info->eti_big_lmm);
1485                        OBD_FREE_LARGE(info->eti_big_lmm,
1486                                       info->eti_big_lmmsize);
1487                        info->eti_big_lmm = NULL;
1488                        info->eti_big_lmmsize = 0;
1489                }
1490
1491                OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1492                if (info->eti_big_lmm == NULL)
1493                        RETURN(-ENOMEM);
1494                info->eti_big_lmmsize = size;
1495        }
1496        LASSERT(info->eti_big_lmmsize >= rc);
1497
1498        info->eti_buf.lb_buf = info->eti_big_lmm;
1499        info->eti_buf.lb_len = info->eti_big_lmmsize;
1500        rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1501        if (rc < 0)
1502                RETURN(rc);
1503
1504        ma->ma_valid |= MA_LOV;
1505        ma->ma_lmm = info->eti_big_lmm;
1506        ma->ma_lmm_size = rc;
1507
1508        RETURN(0);
1509}
1510
1511int echo_attr_get_complex(const struct lu_env *env, struct md_object *next,
1512                          struct md_attr *ma)
1513{
1514        struct echo_thread_info *info = echo_env_info(env);
1515        struct lu_buf           *buf = &info->eti_buf;
1516        umode_t          mode = lu_object_attr(&next->mo_lu);
1517        int                      need = ma->ma_need;
1518        int                      rc = 0, rc2;
1519
1520        ENTRY;
1521
1522        ma->ma_valid = 0;
1523
1524        if (need & MA_INODE) {
1525                ma->ma_need = MA_INODE;
1526                rc = mo_attr_get(env, next, ma);
1527                if (rc)
1528                        GOTO(out, rc);
1529                ma->ma_valid |= MA_INODE;
1530        }
1531
1532        if (need & MA_LOV) {
1533                if (S_ISREG(mode) || S_ISDIR(mode)) {
1534                        LASSERT(ma->ma_lmm_size > 0);
1535                        buf->lb_buf = ma->ma_lmm;
1536                        buf->lb_len = ma->ma_lmm_size;
1537                        rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1538                        if (rc2 > 0) {
1539                                ma->ma_lmm_size = rc2;
1540                                ma->ma_valid |= MA_LOV;
1541                        } else if (rc2 == -ENODATA) {
1542                                /* no LOV EA */
1543                                ma->ma_lmm_size = 0;
1544                        } else if (rc2 == -ERANGE) {
1545                                rc2 = echo_big_lmm_get(env, next, ma);
1546                                if (rc2 < 0)
1547                                        GOTO(out, rc = rc2);
1548                        } else {
1549                                GOTO(out, rc = rc2);
1550                        }
1551                }
1552        }
1553
1554#ifdef CONFIG_FS_POSIX_ACL
1555        if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1556                buf->lb_buf = ma->ma_acl;
1557                buf->lb_len = ma->ma_acl_size;
1558                rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1559                if (rc2 > 0) {
1560                        ma->ma_acl_size = rc2;
1561                        ma->ma_valid |= MA_ACL_DEF;
1562                } else if (rc2 == -ENODATA) {
1563                        /* no ACLs */
1564                        ma->ma_acl_size = 0;
1565                } else {
1566                        GOTO(out, rc = rc2);
1567                }
1568        }
1569#endif
1570out:
1571        ma->ma_need = need;
1572        CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
1573               rc, ma->ma_valid, ma->ma_lmm);
1574        RETURN(rc);
1575}
1576
1577static int
1578echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1579                        struct md_object *parent, struct lu_fid *fid,
1580                        struct lu_name *lname, struct md_op_spec *spec,
1581                        struct md_attr *ma)
1582{
1583        struct lu_object        *ec_child, *child;
1584        struct lu_device        *ld = ed->ed_next;
1585        struct echo_thread_info *info = echo_env_info(env);
1586        struct lu_fid           *fid2 = &info->eti_fid2;
1587        struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1588        int                      rc;
1589
1590        ENTRY;
1591
1592        rc = mdo_lookup(env, parent, lname, fid2, spec);
1593        if (rc == 0)
1594                return -EEXIST;
1595        else if (rc != -ENOENT)
1596                return rc;
1597
1598        ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1599                                     fid, &conf);
1600        if (IS_ERR(ec_child)) {
1601                CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1602                        PTR_ERR(ec_child));
1603                RETURN(PTR_ERR(ec_child));
1604        }
1605
1606        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1607        if (child == NULL) {
1608                CERROR("Can not locate the child "DFID"\n", PFID(fid));
1609                GOTO(out_put, rc = -EINVAL);
1610        }
1611
1612        CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1613               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1614
1615        /*
1616         * Do not perform lookup sanity check. We know that name does not exist.
1617         */
1618        spec->sp_cr_lookup = 0;
1619        rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1620        if (rc) {
1621                CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1622                GOTO(out_put, rc);
1623        }
1624        CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1625               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1626        EXIT;
1627out_put:
1628        lu_object_put(env, ec_child);
1629        return rc;
1630}
1631
1632static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1633                             struct md_attr *ma)
1634{
1635        struct echo_thread_info *info = echo_env_info(env);
1636
1637        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1638                ma->ma_lmm = (void *)&info->eti_lmm;
1639                ma->ma_lmm_size = sizeof(info->eti_lmm);
1640        } else {
1641                LASSERT(info->eti_big_lmmsize);
1642                ma->ma_lmm = info->eti_big_lmm;
1643                ma->ma_lmm_size = info->eti_big_lmmsize;
1644        }
1645
1646        return 0;
1647}
1648
1649static int echo_create_md_object(const struct lu_env *env,
1650                                 struct echo_device *ed,
1651                                 struct lu_object *ec_parent,
1652                                 struct lu_fid *fid,
1653                                 char *name, int namelen,
1654                                 __u64 id, __u32 mode, int count,
1655                                 int stripe_count, int stripe_offset)
1656{
1657        struct lu_object        *parent;
1658        struct echo_thread_info *info = echo_env_info(env);
1659        struct lu_name    *lname = &info->eti_lname;
1660        struct md_op_spec       *spec = &info->eti_spec;
1661        struct md_attr    *ma = &info->eti_ma;
1662        struct lu_device        *ld = ed->ed_next;
1663        int                   rc = 0;
1664        int                   i;
1665
1666        ENTRY;
1667
1668        if (ec_parent == NULL)
1669                return -1;
1670        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1671        if (parent == NULL)
1672                RETURN(-ENXIO);
1673
1674        memset(ma, 0, sizeof(*ma));
1675        memset(spec, 0, sizeof(*spec));
1676        if (stripe_count != 0) {
1677                spec->sp_cr_flags |= FMODE_WRITE;
1678                echo_set_lmm_size(env, ld, ma);
1679                if (stripe_count != -1) {
1680                        struct lov_user_md_v3 *lum = &info->eti_lum;
1681
1682                        lum->lmm_magic = LOV_USER_MAGIC_V3;
1683                        lum->lmm_stripe_count = stripe_count;
1684                        lum->lmm_stripe_offset = stripe_offset;
1685                        lum->lmm_pattern = 0;
1686                        spec->u.sp_ea.eadata = lum;
1687                        spec->u.sp_ea.eadatalen = sizeof(*lum);
1688                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1689                }
1690        }
1691
1692        ma->ma_attr.la_mode = mode;
1693        ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1694        ma->ma_attr.la_ctime = cfs_time_current_64();
1695
1696        if (name != NULL) {
1697                lname->ln_name = name;
1698                lname->ln_namelen = namelen;
1699                /* If name is specified, only create one object by name */
1700                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1701                                             spec, ma);
1702                RETURN(rc);
1703        }
1704
1705        /* Create multiple object sequenced by id */
1706        for (i = 0; i < count; i++) {
1707                char *tmp_name = info->eti_name;
1708
1709                echo_md_build_name(lname, tmp_name, id);
1710
1711                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1712                                             spec, ma);
1713                if (rc) {
1714                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
1715                                rc);
1716                        break;
1717                }
1718                id++;
1719                fid->f_oid++;
1720        }
1721
1722        RETURN(rc);
1723}
1724
1725static struct lu_object *echo_md_lookup(const struct lu_env *env,
1726                                        struct echo_device *ed,
1727                                        struct md_object *parent,
1728                                        struct lu_name *lname)
1729{
1730        struct echo_thread_info *info = echo_env_info(env);
1731        struct lu_fid      *fid = &info->eti_fid;
1732        struct lu_object        *child;
1733        int    rc;
1734        ENTRY;
1735
1736        CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1737               PFID(fid), parent);
1738        rc = mdo_lookup(env, parent, lname, fid, NULL);
1739        if (rc) {
1740                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1741                RETURN(ERR_PTR(rc));
1742        }
1743
1744        /* In the function below, .hs_keycmp resolves to
1745         * lu_obj_hop_keycmp() */
1746        /* coverity[overrun-buffer-val] */
1747        child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1748
1749        RETURN(child);
1750}
1751
1752static int echo_setattr_object(const struct lu_env *env,
1753                               struct echo_device *ed,
1754                               struct lu_object *ec_parent,
1755                               __u64 id, int count)
1756{
1757        struct lu_object        *parent;
1758        struct echo_thread_info *info = echo_env_info(env);
1759        struct lu_name    *lname = &info->eti_lname;
1760        char                *name = info->eti_name;
1761        struct lu_device        *ld = ed->ed_next;
1762        struct lu_buf      *buf = &info->eti_buf;
1763        int                   rc = 0;
1764        int                   i;
1765
1766        ENTRY;
1767
1768        if (ec_parent == NULL)
1769                return -1;
1770        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1771        if (parent == NULL)
1772                RETURN(-ENXIO);
1773
1774        for (i = 0; i < count; i++) {
1775                struct lu_object *ec_child, *child;
1776
1777                echo_md_build_name(lname, name, id);
1778
1779                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1780                if (IS_ERR(ec_child)) {
1781                        CERROR("Can't find child %s: rc = %ld\n",
1782                                lname->ln_name, PTR_ERR(ec_child));
1783                        RETURN(PTR_ERR(ec_child));
1784                }
1785
1786                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1787                if (child == NULL) {
1788                        CERROR("Can not locate the child %s\n", lname->ln_name);
1789                        lu_object_put(env, ec_child);
1790                        rc = -EINVAL;
1791                        break;
1792                }
1793
1794                CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1795                       PFID(lu_object_fid(child)));
1796
1797                buf->lb_buf = info->eti_xattr_buf;
1798                buf->lb_len = sizeof(info->eti_xattr_buf);
1799
1800                sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1801                rc = mo_xattr_set(env, lu2md(child), buf, name,
1802                                  LU_XATTR_CREATE);
1803                if (rc < 0) {
1804                        CERROR("Can not setattr child "DFID": rc = %d\n",
1805                                PFID(lu_object_fid(child)), rc);
1806                        lu_object_put(env, ec_child);
1807                        break;
1808                }
1809                CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1810                       PFID(lu_object_fid(child)));
1811                id++;
1812                lu_object_put(env, ec_child);
1813        }
1814        RETURN(rc);
1815}
1816
1817static int echo_getattr_object(const struct lu_env *env,
1818                               struct echo_device *ed,
1819                               struct lu_object *ec_parent,
1820                               __u64 id, int count)
1821{
1822        struct lu_object        *parent;
1823        struct echo_thread_info *info = echo_env_info(env);
1824        struct lu_name    *lname = &info->eti_lname;
1825        char                *name = info->eti_name;
1826        struct md_attr    *ma = &info->eti_ma;
1827        struct lu_device        *ld = ed->ed_next;
1828        int                   rc = 0;
1829        int                   i;
1830
1831        ENTRY;
1832
1833        if (ec_parent == NULL)
1834                return -1;
1835        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1836        if (parent == NULL)
1837                RETURN(-ENXIO);
1838
1839        memset(ma, 0, sizeof(*ma));
1840        ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1841        ma->ma_acl = info->eti_xattr_buf;
1842        ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1843
1844        for (i = 0; i < count; i++) {
1845                struct lu_object *ec_child, *child;
1846
1847                ma->ma_valid = 0;
1848                echo_md_build_name(lname, name, id);
1849                echo_set_lmm_size(env, ld, ma);
1850
1851                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1852                if (IS_ERR(ec_child)) {
1853                        CERROR("Can't find child %s: rc = %ld\n",
1854                               lname->ln_name, PTR_ERR(ec_child));
1855                        RETURN(PTR_ERR(ec_child));
1856                }
1857
1858                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1859                if (child == NULL) {
1860                        CERROR("Can not locate the child %s\n", lname->ln_name);
1861                        lu_object_put(env, ec_child);
1862                        RETURN(-EINVAL);
1863                }
1864
1865                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1866                       PFID(lu_object_fid(child)));
1867                rc = echo_attr_get_complex(env, lu2md(child), ma);
1868                if (rc) {
1869                        CERROR("Can not getattr child "DFID": rc = %d\n",
1870                                PFID(lu_object_fid(child)), rc);
1871                        lu_object_put(env, ec_child);
1872                        break;
1873                }
1874                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1875                       PFID(lu_object_fid(child)));
1876                id++;
1877                lu_object_put(env, ec_child);
1878        }
1879
1880        RETURN(rc);
1881}
1882
1883static int echo_lookup_object(const struct lu_env *env,
1884                              struct echo_device *ed,
1885                              struct lu_object *ec_parent,
1886                              __u64 id, int count)
1887{
1888        struct lu_object        *parent;
1889        struct echo_thread_info *info = echo_env_info(env);
1890        struct lu_name    *lname = &info->eti_lname;
1891        char                *name = info->eti_name;
1892        struct lu_fid      *fid = &info->eti_fid;
1893        struct lu_device        *ld = ed->ed_next;
1894        int                   rc = 0;
1895        int                   i;
1896
1897        if (ec_parent == NULL)
1898                return -1;
1899        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1900        if (parent == NULL)
1901                return -ENXIO;
1902
1903        /*prepare the requests*/
1904        for (i = 0; i < count; i++) {
1905                echo_md_build_name(lname, name, id);
1906
1907                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1908                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
1909
1910                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1911                if (rc) {
1912                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1913                        break;
1914                }
1915                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1916                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
1917
1918                id++;
1919        }
1920        return rc;
1921}
1922
1923static int echo_md_destroy_internal(const struct lu_env *env,
1924                                    struct echo_device *ed,
1925                                    struct md_object *parent,
1926                                    struct lu_name *lname,
1927                                    struct md_attr *ma)
1928{
1929        struct lu_device   *ld = ed->ed_next;
1930        struct lu_object   *ec_child;
1931        struct lu_object   *child;
1932        int              rc;
1933
1934        ENTRY;
1935
1936        ec_child = echo_md_lookup(env, ed, parent, lname);
1937        if (IS_ERR(ec_child)) {
1938                CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1939                        PTR_ERR(ec_child));
1940                RETURN(PTR_ERR(ec_child));
1941        }
1942
1943        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1944        if (child == NULL) {
1945                CERROR("Can not locate the child %s\n", lname->ln_name);
1946                GOTO(out_put, rc = -EINVAL);
1947        }
1948
1949        CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1950               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1951
1952        rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1953        if (rc) {
1954                CERROR("Can not unlink child %s: rc = %d\n",
1955                        lname->ln_name, rc);
1956                GOTO(out_put, rc);
1957        }
1958        CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1959               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1960out_put:
1961        lu_object_put(env, ec_child);
1962        return rc;
1963}
1964
1965static int echo_destroy_object(const struct lu_env *env,
1966                               struct echo_device *ed,
1967                               struct lu_object *ec_parent,
1968                               char *name, int namelen,
1969                               __u64 id, __u32 mode,
1970                               int count)
1971{
1972        struct echo_thread_info *info = echo_env_info(env);
1973        struct lu_name    *lname = &info->eti_lname;
1974        struct md_attr    *ma = &info->eti_ma;
1975        struct lu_device        *ld = ed->ed_next;
1976        struct lu_object        *parent;
1977        int                   rc = 0;
1978        int                   i;
1979        ENTRY;
1980
1981        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1982        if (parent == NULL)
1983                RETURN(-EINVAL);
1984
1985        memset(ma, 0, sizeof(*ma));
1986        ma->ma_attr.la_mode = mode;
1987        ma->ma_attr.la_valid = LA_CTIME;
1988        ma->ma_attr.la_ctime = cfs_time_current_64();
1989        ma->ma_need = MA_INODE;
1990        ma->ma_valid = 0;
1991
1992        if (name != NULL) {
1993                lname->ln_name = name;
1994                lname->ln_namelen = namelen;
1995                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1996                                              ma);
1997                RETURN(rc);
1998        }
1999
2000        /*prepare the requests*/
2001        for (i = 0; i < count; i++) {
2002                char *tmp_name = info->eti_name;
2003
2004                ma->ma_valid = 0;
2005                echo_md_build_name(lname, tmp_name, id);
2006
2007                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
2008                                              ma);
2009                if (rc) {
2010                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
2011                        break;
2012                }
2013                id++;
2014        }
2015
2016        RETURN(rc);
2017}
2018
2019static struct lu_object *echo_resolve_path(const struct lu_env *env,
2020                                           struct echo_device *ed, char *path,
2021                                           int path_len)
2022{
2023        struct lu_device        *ld = ed->ed_next;
2024        struct md_device        *md = lu2md_dev(ld);
2025        struct echo_thread_info *info = echo_env_info(env);
2026        struct lu_fid      *fid = &info->eti_fid;
2027        struct lu_name    *lname = &info->eti_lname;
2028        struct lu_object        *parent = NULL;
2029        struct lu_object        *child = NULL;
2030        int rc = 0;
2031        ENTRY;
2032
2033        /*Only support MDD layer right now*/
2034        rc = md->md_ops->mdo_root_get(env, md, fid);
2035        if (rc) {
2036                CERROR("get root error: rc = %d\n", rc);
2037                RETURN(ERR_PTR(rc));
2038        }
2039
2040        /* In the function below, .hs_keycmp resolves to
2041         * lu_obj_hop_keycmp() */
2042        /* coverity[overrun-buffer-val] */
2043        parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
2044        if (IS_ERR(parent)) {
2045                CERROR("Can not find the parent "DFID": rc = %ld\n",
2046                        PFID(fid), PTR_ERR(parent));
2047                RETURN(parent);
2048        }
2049
2050        while (1) {
2051                struct lu_object *ld_parent;
2052                char *e;
2053
2054                e = strsep(&path, "/");
2055                if (e == NULL)
2056                        break;
2057
2058                if (e[0] == 0) {
2059                        if (!path || path[0] == '\0')
2060                                break;
2061                        continue;
2062                }
2063
2064                lname->ln_name = e;
2065                lname->ln_namelen = strlen(e);
2066
2067                ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
2068                if (ld_parent == NULL) {
2069                        lu_object_put(env, parent);
2070                        rc = -EINVAL;
2071                        break;
2072                }
2073
2074                child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2075                lu_object_put(env, parent);
2076                if (IS_ERR(child)) {
2077                        rc = (int)PTR_ERR(child);
2078                        CERROR("lookup %s under parent "DFID": rc = %d\n",
2079                                lname->ln_name, PFID(lu_object_fid(ld_parent)),
2080                                rc);
2081                        break;
2082                }
2083                parent = child;
2084        }
2085        if (rc)
2086                RETURN(ERR_PTR(rc));
2087
2088        RETURN(parent);
2089}
2090
2091static void echo_ucred_init(struct lu_env *env)
2092{
2093        struct lu_ucred *ucred = lu_ucred(env);
2094
2095        ucred->uc_valid = UCRED_INVALID;
2096
2097        ucred->uc_suppgids[0] = -1;
2098        ucred->uc_suppgids[1] = -1;
2099
2100        ucred->uc_uid   = ucred->uc_o_uid   = current_uid();
2101        ucred->uc_gid   = ucred->uc_o_gid   = current_gid();
2102        ucred->uc_fsuid = ucred->uc_o_fsuid = current_fsuid();
2103        ucred->uc_fsgid = ucred->uc_o_fsgid = current_fsgid();
2104        ucred->uc_cap   = cfs_curproc_cap_pack();
2105
2106        /* remove fs privilege for non-root user. */
2107        if (ucred->uc_fsuid)
2108                ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2109        ucred->uc_valid = UCRED_NEW;
2110}
2111
2112static void echo_ucred_fini(struct lu_env *env)
2113{
2114        struct lu_ucred *ucred = lu_ucred(env);
2115        ucred->uc_valid = UCRED_INIT;
2116}
2117
2118#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2119#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION)
2120static int echo_md_handler(struct echo_device *ed, int command,
2121                           char *path, int path_len, __u64 id, int count,
2122                           struct obd_ioctl_data *data)
2123{
2124        struct echo_thread_info *info;
2125        struct lu_device      *ld = ed->ed_next;
2126        struct lu_env    *env;
2127        int                 refcheck;
2128        struct lu_object      *parent;
2129        char              *name = NULL;
2130        int                 namelen = data->ioc_plen2;
2131        int                 rc = 0;
2132        ENTRY;
2133
2134        if (ld == NULL) {
2135                CERROR("MD echo client is not being initialized properly\n");
2136                RETURN(-EINVAL);
2137        }
2138
2139        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2140                CERROR("Only support MDD layer right now!\n");
2141                RETURN(-EINVAL);
2142        }
2143
2144        env = cl_env_get(&refcheck);
2145        if (IS_ERR(env))
2146                RETURN(PTR_ERR(env));
2147
2148        rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2149        if (rc != 0)
2150                GOTO(out_env, rc);
2151
2152        /* init big_lmm buffer */
2153        info = echo_env_info(env);
2154        LASSERT(info->eti_big_lmm == NULL);
2155        OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2156        if (info->eti_big_lmm == NULL)
2157                GOTO(out_env, rc = -ENOMEM);
2158        info->eti_big_lmmsize = MIN_MD_SIZE;
2159
2160        parent = echo_resolve_path(env, ed, path, path_len);
2161        if (IS_ERR(parent)) {
2162                CERROR("Can not resolve the path %s: rc = %ld\n", path,
2163                        PTR_ERR(parent));
2164                GOTO(out_free, rc = PTR_ERR(parent));
2165        }
2166
2167        if (namelen > 0) {
2168                OBD_ALLOC(name, namelen + 1);
2169                if (name == NULL)
2170                        GOTO(out_put, rc = -ENOMEM);
2171                if (copy_from_user(name, data->ioc_pbuf2, namelen))
2172                        GOTO(out_name, rc = -EFAULT);
2173        }
2174
2175        echo_ucred_init(env);
2176
2177        switch (command) {
2178        case ECHO_MD_CREATE:
2179        case ECHO_MD_MKDIR: {
2180                struct echo_thread_info *info = echo_env_info(env);
2181                __u32 mode = data->ioc_obdo2.o_mode;
2182                struct lu_fid *fid = &info->eti_fid;
2183                int stripe_count = (int)data->ioc_obdo2.o_misc;
2184                int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2185
2186                rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2187                if (rc != 0)
2188                        break;
2189
2190                /* In the function below, .hs_keycmp resolves to
2191                 * lu_obj_hop_keycmp() */
2192                /* coverity[overrun-buffer-val] */
2193                rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2194                                           id, mode, count, stripe_count,
2195                                           stripe_index);
2196                break;
2197        }
2198        case ECHO_MD_DESTROY:
2199        case ECHO_MD_RMDIR: {
2200                __u32 mode = data->ioc_obdo2.o_mode;
2201
2202                rc = echo_destroy_object(env, ed, parent, name, namelen,
2203                                         id, mode, count);
2204                break;
2205        }
2206        case ECHO_MD_LOOKUP:
2207                rc = echo_lookup_object(env, ed, parent, id, count);
2208                break;
2209        case ECHO_MD_GETATTR:
2210                rc = echo_getattr_object(env, ed, parent, id, count);
2211                break;
2212        case ECHO_MD_SETATTR:
2213                rc = echo_setattr_object(env, ed, parent, id, count);
2214                break;
2215        default:
2216                CERROR("unknown command %d\n", command);
2217                rc = -EINVAL;
2218                break;
2219        }
2220        echo_ucred_fini(env);
2221
2222out_name:
2223        if (name != NULL)
2224                OBD_FREE(name, namelen + 1);
2225out_put:
2226        lu_object_put(env, parent);
2227out_free:
2228        LASSERT(info->eti_big_lmm);
2229        OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2230        info->eti_big_lmm = NULL;
2231        info->eti_big_lmmsize = 0;
2232out_env:
2233        cl_env_put(env, &refcheck);
2234        return rc;
2235}
2236
2237static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2238                              int on_target, struct obdo *oa, void *ulsm,
2239                              int ulsm_nob, struct obd_trans_info *oti)
2240{
2241        struct echo_object     *eco;
2242        struct echo_client_obd *ec = ed->ed_ec;
2243        struct lov_stripe_md   *lsm = NULL;
2244        int                  rc;
2245        int                  created = 0;
2246        ENTRY;
2247
2248        if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
2249            (on_target ||                      /* set_stripe */
2250             ec->ec_nstripes != 0)) {      /* LOV */
2251                CERROR ("No valid oid\n");
2252                RETURN(-EINVAL);
2253        }
2254
2255        rc = echo_alloc_memmd(ed, &lsm);
2256        if (rc < 0) {
2257                CERROR("Cannot allocate md: rc = %d\n", rc);
2258                GOTO(failed, rc);
2259        }
2260
2261        if (ulsm != NULL) {
2262                int i, idx;
2263
2264                rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
2265                if (rc != 0)
2266                        GOTO(failed, rc);
2267
2268                if (lsm->lsm_stripe_count == 0)
2269                        lsm->lsm_stripe_count = ec->ec_nstripes;
2270
2271                if (lsm->lsm_stripe_size == 0)
2272                        lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
2273
2274                idx = cfs_rand();
2275
2276                /* setup stripes: indices + default ids if required */
2277                for (i = 0; i < lsm->lsm_stripe_count; i++) {
2278                        if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
2279                                lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
2280
2281                        lsm->lsm_oinfo[i]->loi_ost_idx =
2282                                (idx + i) % ec->ec_nstripes;
2283                }
2284        }
2285
2286        /* setup object ID here for !on_target and LOV hint */
2287        if (oa->o_valid & OBD_MD_FLID) {
2288                LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2289                lsm->lsm_oi = oa->o_oi;
2290        }
2291
2292        if (ostid_id(&lsm->lsm_oi) == 0)
2293                ostid_set_id(&lsm->lsm_oi, ++last_object_id);
2294
2295        rc = 0;
2296        if (on_target) {
2297                /* Only echo objects are allowed to be created */
2298                LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
2299                        (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
2300                rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
2301                if (rc != 0) {
2302                        CERROR("Cannot create objects: rc = %d\n", rc);
2303                        GOTO(failed, rc);
2304                }
2305                created = 1;
2306        }
2307
2308        /* See what object ID we were given */
2309        oa->o_oi = lsm->lsm_oi;
2310        oa->o_valid |= OBD_MD_FLID;
2311
2312        eco = cl_echo_object_find(ed, &lsm);
2313        if (IS_ERR(eco))
2314                GOTO(failed, rc = PTR_ERR(eco));
2315        cl_echo_object_put(eco);
2316
2317        CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2318        EXIT;
2319
2320 failed:
2321        if (created && rc)
2322                obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
2323        if (lsm)
2324                echo_free_memmd(ed, &lsm);
2325        if (rc)
2326                CERROR("create object failed with: rc = %d\n", rc);
2327        return (rc);
2328}
2329
2330static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2331                           struct obdo *oa)
2332{
2333        struct lov_stripe_md   *lsm = NULL;
2334        struct echo_object     *eco;
2335        int                  rc;
2336        ENTRY;
2337
2338        if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
2339                /* disallow use of object id 0 */
2340                CERROR ("No valid oid\n");
2341                RETURN(-EINVAL);
2342        }
2343
2344        rc = echo_alloc_memmd(ed, &lsm);
2345        if (rc < 0)
2346                RETURN(rc);
2347
2348        lsm->lsm_oi = oa->o_oi;
2349        if (!(oa->o_valid & OBD_MD_FLGROUP))
2350                ostid_set_seq_echo(&lsm->lsm_oi);
2351
2352        rc = 0;
2353        eco = cl_echo_object_find(ed, &lsm);
2354        if (!IS_ERR(eco))
2355                *ecop = eco;
2356        else
2357                rc = PTR_ERR(eco);
2358        if (lsm)
2359                echo_free_memmd(ed, &lsm);
2360        RETURN(rc);
2361}
2362
2363static void echo_put_object(struct echo_object *eco)
2364{
2365        if (cl_echo_object_put(eco))
2366                CERROR("echo client: drop an object failed");
2367}
2368
2369static void
2370echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
2371{
2372        unsigned long stripe_count;
2373        unsigned long stripe_size;
2374        unsigned long width;
2375        unsigned long woffset;
2376        int        stripe_index;
2377        obd_off       offset;
2378
2379        if (lsm->lsm_stripe_count <= 1)
2380                return;
2381
2382        offset       = *offp;
2383        stripe_size  = lsm->lsm_stripe_size;
2384        stripe_count = lsm->lsm_stripe_count;
2385
2386        /* width = # bytes in all stripes */
2387        width = stripe_size * stripe_count;
2388
2389        /* woffset = offset within a width; offset = whole number of widths */
2390        woffset = do_div (offset, width);
2391
2392        stripe_index = woffset / stripe_size;
2393
2394        *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
2395        *offp = offset * stripe_size + woffset % stripe_size;
2396}
2397
2398static void
2399echo_client_page_debug_setup(struct lov_stripe_md *lsm,
2400                             struct page *page, int rw, obd_id id,
2401                             obd_off offset, obd_off count)
2402{
2403        char    *addr;
2404        obd_off  stripe_off;
2405        obd_id   stripe_id;
2406        int      delta;
2407
2408        /* no partial pages on the client */
2409        LASSERT(count == PAGE_CACHE_SIZE);
2410
2411        addr = kmap(page);
2412
2413        for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2414                if (rw == OBD_BRW_WRITE) {
2415                        stripe_off = offset + delta;
2416                        stripe_id = id;
2417                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
2418                } else {
2419                        stripe_off = 0xdeadbeef00c0ffeeULL;
2420                        stripe_id = 0xdeadbeef00c0ffeeULL;
2421                }
2422                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2423                                  stripe_off, stripe_id);
2424        }
2425
2426        kunmap(page);
2427}
2428
2429static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
2430                                        struct page *page, obd_id id,
2431                                        obd_off offset, obd_off count)
2432{
2433        obd_off stripe_off;
2434        obd_id  stripe_id;
2435        char   *addr;
2436        int     delta;
2437        int     rc;
2438        int     rc2;
2439
2440        /* no partial pages on the client */
2441        LASSERT(count == PAGE_CACHE_SIZE);
2442
2443        addr = kmap(page);
2444
2445        for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2446                stripe_off = offset + delta;
2447                stripe_id = id;
2448                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
2449
2450                rc2 = block_debug_check("test_brw",
2451                                        addr + delta, OBD_ECHO_BLOCK_SIZE,
2452                                        stripe_off, stripe_id);
2453                if (rc2 != 0) {
2454                        CERROR ("Error in echo object "LPX64"\n", id);
2455                        rc = rc2;
2456                }
2457        }
2458
2459        kunmap(page);
2460        return rc;
2461}
2462
2463static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2464                            struct echo_object *eco, obd_off offset,
2465                            obd_size count, int async,
2466                            struct obd_trans_info *oti)
2467{
2468        struct lov_stripe_md   *lsm = eco->eo_lsm;
2469        obd_count              npages;
2470        struct brw_page *pga;
2471        struct brw_page *pgp;
2472        struct page         **pages;
2473        obd_off          off;
2474        int                  i;
2475        int                  rc;
2476        int                  verify;
2477        int                  gfp_mask;
2478        int                  brw_flags = 0;
2479        ENTRY;
2480
2481        verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2482                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2483                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2484
2485        gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
2486
2487        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2488        LASSERT(lsm != NULL);
2489        LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
2490
2491        if (count <= 0 ||
2492            (count & (~CFS_PAGE_MASK)) != 0)
2493                RETURN(-EINVAL);
2494
2495        /* XXX think again with misaligned I/O */
2496        npages = count >> PAGE_CACHE_SHIFT;
2497
2498        if (rw == OBD_BRW_WRITE)
2499                brw_flags = OBD_BRW_ASYNC;
2500
2501        OBD_ALLOC(pga, npages * sizeof(*pga));
2502        if (pga == NULL)
2503                RETURN(-ENOMEM);
2504
2505        OBD_ALLOC(pages, npages * sizeof(*pages));
2506        if (pages == NULL) {
2507                OBD_FREE(pga, npages * sizeof(*pga));
2508                RETURN(-ENOMEM);
2509        }
2510
2511        for (i = 0, pgp = pga, off = offset;
2512             i < npages;
2513             i++, pgp++, off += PAGE_CACHE_SIZE) {
2514
2515                LASSERT (pgp->pg == NULL);      /* for cleanup */
2516
2517                rc = -ENOMEM;
2518                OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
2519                if (pgp->pg == NULL)
2520                        goto out;
2521
2522                pages[i] = pgp->pg;
2523                pgp->count = PAGE_CACHE_SIZE;
2524                pgp->off = off;
2525                pgp->flag = brw_flags;
2526
2527                if (verify)
2528                        echo_client_page_debug_setup(lsm, pgp->pg, rw,
2529                                                     ostid_id(&oa->o_oi), off,
2530                                                     pgp->count);
2531        }
2532
2533        /* brw mode can only be used at client */
2534        LASSERT(ed->ed_next != NULL);
2535        rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2536
2537 out:
2538        if (rc != 0 || rw != OBD_BRW_READ)
2539                verify = 0;
2540
2541        for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2542                if (pgp->pg == NULL)
2543                        continue;
2544
2545                if (verify) {
2546                        int vrc;
2547                        vrc = echo_client_page_debug_check(lsm, pgp->pg,
2548                                                           ostid_id(&oa->o_oi),
2549                                                           pgp->off, pgp->count);
2550                        if (vrc != 0 && rc == 0)
2551                                rc = vrc;
2552                }
2553                OBD_PAGE_FREE(pgp->pg);
2554        }
2555        OBD_FREE(pga, npages * sizeof(*pga));
2556        OBD_FREE(pages, npages * sizeof(*pages));
2557        RETURN(rc);
2558}
2559
2560static int echo_client_prep_commit(const struct lu_env *env,
2561                                   struct obd_export *exp, int rw,
2562                                   struct obdo *oa, struct echo_object *eco,
2563                                   obd_off offset, obd_size count,
2564                                   obd_size batch, struct obd_trans_info *oti,
2565                                   int async)
2566{
2567        struct lov_stripe_md *lsm = eco->eo_lsm;
2568        struct obd_ioobj ioo;
2569        struct niobuf_local *lnb;
2570        struct niobuf_remote *rnb;
2571        obd_off off;
2572        obd_size npages, tot_pages;
2573        int i, ret = 0, brw_flags = 0;
2574
2575        ENTRY;
2576
2577        if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
2578            (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
2579                RETURN(-EINVAL);
2580
2581        npages = batch >> PAGE_CACHE_SHIFT;
2582        tot_pages = count >> PAGE_CACHE_SHIFT;
2583
2584        OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
2585        OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
2586
2587        if (lnb == NULL || rnb == NULL)
2588                GOTO(out, ret = -ENOMEM);
2589
2590        if (rw == OBD_BRW_WRITE && async)
2591                brw_flags |= OBD_BRW_ASYNC;
2592
2593        obdo_to_ioobj(oa, &ioo);
2594
2595        off = offset;
2596
2597        for(; tot_pages; tot_pages -= npages) {
2598                int lpages;
2599
2600                if (tot_pages < npages)
2601                        npages = tot_pages;
2602
2603                for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
2604                        rnb[i].offset = off;
2605                        rnb[i].len = PAGE_CACHE_SIZE;
2606                        rnb[i].flags = brw_flags;
2607                }
2608
2609                ioo.ioo_bufcnt = npages;
2610                oti->oti_transno = 0;
2611
2612                lpages = npages;
2613                ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
2614                                 lnb, oti, NULL);
2615                if (ret != 0)
2616                        GOTO(out, ret);
2617                LASSERT(lpages == npages);
2618
2619                for (i = 0; i < lpages; i++) {
2620                        struct page *page = lnb[i].page;
2621
2622                        /* read past eof? */
2623                        if (page == NULL && lnb[i].rc == 0)
2624                                continue;
2625
2626                        if (async)
2627                                lnb[i].flags |= OBD_BRW_ASYNC;
2628
2629                        if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2630                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2631                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2632                                continue;
2633
2634                        if (rw == OBD_BRW_WRITE)
2635                                echo_client_page_debug_setup(lsm, page, rw,
2636                                                            ostid_id(&oa->o_oi),
2637                                                             rnb[i].offset,
2638                                                             rnb[i].len);
2639                        else
2640                                echo_client_page_debug_check(lsm, page,
2641                                                            ostid_id(&oa->o_oi),
2642                                                             rnb[i].offset,
2643                                                             rnb[i].len);
2644                }
2645
2646                ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
2647                                   rnb, npages, lnb, oti, ret);
2648                if (ret != 0)
2649                        GOTO(out, ret);
2650
2651                /* Reset oti otherwise it would confuse ldiskfs. */
2652                memset(oti, 0, sizeof(*oti));
2653
2654                /* Reuse env context. */
2655                lu_context_exit((struct lu_context *)&env->le_ctx);
2656                lu_context_enter((struct lu_context *)&env->le_ctx);
2657        }
2658
2659out:
2660        if (lnb)
2661                OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
2662        if (rnb)
2663                OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
2664        RETURN(ret);
2665}
2666
2667static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2668                                 struct obd_export *exp,
2669                                 struct obd_ioctl_data *data,
2670                                 struct obd_trans_info *dummy_oti)
2671{
2672        struct obd_device *obd = class_exp2obd(exp);
2673        struct echo_device *ed = obd2echo_dev(obd);
2674        struct echo_client_obd *ec = ed->ed_ec;
2675        struct obdo *oa = &data->ioc_obdo1;
2676        struct echo_object *eco;
2677        int rc;
2678        int async = 1;
2679        long test_mode;
2680        ENTRY;
2681
2682        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2683
2684        rc = echo_get_object(&eco, ed, oa);
2685        if (rc)
2686                RETURN(rc);
2687
2688        oa->o_valid &= ~OBD_MD_FLHANDLE;
2689
2690        /* OFD/obdfilter works only via prep/commit */
2691        test_mode = (long)data->ioc_pbuf1;
2692        if (test_mode == 1)
2693                async = 0;
2694
2695        if (ed->ed_next == NULL && test_mode != 3) {
2696                test_mode = 3;
2697                data->ioc_plen1 = data->ioc_count;
2698        }
2699
2700        /* Truncate batch size to maximum */
2701        if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2702                data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2703
2704        switch (test_mode) {
2705        case 1:
2706                /* fall through */
2707        case 2:
2708                rc = echo_client_kbrw(ed, rw, oa,
2709                                      eco, data->ioc_offset,
2710                                      data->ioc_count, async, dummy_oti);
2711                break;
2712        case 3:
2713                rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
2714                                             eco, data->ioc_offset,
2715                                             data->ioc_count, data->ioc_plen1,
2716                                             dummy_oti, async);
2717                break;
2718        default:
2719                rc = -EINVAL;
2720        }
2721        echo_put_object(eco);
2722        RETURN(rc);
2723}
2724
2725static int
2726echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
2727                    int mode, obd_off offset, obd_size nob)
2728{
2729        struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
2730        struct lustre_handle   *ulh = &oa->o_handle;
2731        struct echo_object     *eco;
2732        obd_off          end;
2733        int                  rc;
2734        ENTRY;
2735
2736        if (ed->ed_next == NULL)
2737                RETURN(-EOPNOTSUPP);
2738
2739        if (!(mode == LCK_PR || mode == LCK_PW))
2740                RETURN(-EINVAL);
2741
2742        if ((offset & (~CFS_PAGE_MASK)) != 0 ||
2743            (nob & (~CFS_PAGE_MASK)) != 0)
2744                RETURN(-EINVAL);
2745
2746        rc = echo_get_object (&eco, ed, oa);
2747        if (rc != 0)
2748                RETURN(rc);
2749
2750        end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
2751        rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
2752        if (rc == 0) {
2753                oa->o_valid |= OBD_MD_FLHANDLE;
2754                CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
2755        }
2756        echo_put_object(eco);
2757        RETURN(rc);
2758}
2759
2760static int
2761echo_client_cancel(struct obd_export *exp, struct obdo *oa)
2762{
2763        struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
2764        __u64          cookie = oa->o_handle.cookie;
2765
2766        if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
2767                return -EINVAL;
2768
2769        CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
2770        return cl_echo_cancel(ed, cookie);
2771}
2772
2773static int
2774echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2775                      void *karg, void *uarg)
2776{
2777        struct obd_device      *obd = exp->exp_obd;
2778        struct echo_device     *ed = obd2echo_dev(obd);
2779        struct echo_client_obd *ec = ed->ed_ec;
2780        struct echo_object     *eco;
2781        struct obd_ioctl_data  *data = karg;
2782        struct obd_trans_info   dummy_oti;
2783        struct lu_env     *env;
2784        struct oti_req_ack_lock *ack_lock;
2785        struct obdo         *oa;
2786        struct lu_fid      fid;
2787        int                  rw = OBD_BRW_READ;
2788        int                  rc = 0;
2789        int                  i;
2790        ENTRY;
2791
2792        memset(&dummy_oti, 0, sizeof(dummy_oti));
2793
2794        oa = &data->ioc_obdo1;
2795        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2796                oa->o_valid |= OBD_MD_FLGROUP;
2797                ostid_set_seq_echo(&oa->o_oi);
2798        }
2799
2800        /* This FID is unpacked just for validation at this point */
2801        rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2802        if (rc < 0)
2803                RETURN(rc);
2804
2805        OBD_ALLOC_PTR(env);
2806        if (env == NULL)
2807                RETURN(-ENOMEM);
2808
2809        rc = lu_env_init(env, LCT_DT_THREAD);
2810        if (rc)
2811                GOTO(out, rc = -ENOMEM);
2812
2813        switch (cmd) {
2814        case OBD_IOC_CREATE:                /* may create echo object */
2815                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2816                        GOTO (out, rc = -EPERM);
2817
2818                rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
2819                                        data->ioc_plen1, &dummy_oti);
2820                GOTO(out, rc);
2821
2822        case OBD_IOC_ECHO_MD: {
2823                int count;
2824                int cmd;
2825                char *dir = NULL;
2826                int dirlen;
2827                __u64 id;
2828
2829                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2830                        GOTO(out, rc = -EPERM);
2831
2832                count = data->ioc_count;
2833                cmd = data->ioc_command;
2834
2835                id = ostid_id(&data->ioc_obdo2.o_oi);
2836
2837                dirlen = data->ioc_plen1;
2838                OBD_ALLOC(dir, dirlen + 1);
2839                if (dir == NULL)
2840                        GOTO(out, rc = -ENOMEM);
2841
2842                if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2843                        OBD_FREE(dir, data->ioc_plen1 + 1);
2844                        GOTO(out, rc = -EFAULT);
2845                }
2846
2847                rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2848                OBD_FREE(dir, dirlen + 1);
2849                GOTO(out, rc);
2850        }
2851        case OBD_IOC_ECHO_ALLOC_SEQ: {
2852                struct lu_env   *cl_env;
2853                int           refcheck;
2854                __u64       seq;
2855                int           max_count;
2856
2857                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2858                        GOTO(out, rc = -EPERM);
2859
2860                cl_env = cl_env_get(&refcheck);
2861                if (IS_ERR(cl_env))
2862                        GOTO(out, rc = PTR_ERR(cl_env));
2863
2864                rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2865                                            ECHO_MD_SES_TAG);
2866                if (rc != 0) {
2867                        cl_env_put(cl_env, &refcheck);
2868                        GOTO(out, rc);
2869                }
2870
2871                rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2872                cl_env_put(cl_env, &refcheck);
2873                if (rc < 0) {
2874                        CERROR("%s: Can not alloc seq: rc = %d\n",
2875                               obd->obd_name, rc);
2876                        GOTO(out, rc);
2877                }
2878
2879                if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2880                        return -EFAULT;
2881
2882                max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2883                if (copy_to_user(data->ioc_pbuf2, &max_count,
2884                                     data->ioc_plen2))
2885                        return -EFAULT;
2886                GOTO(out, rc);
2887        }
2888        case OBD_IOC_DESTROY:
2889                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2890                        GOTO (out, rc = -EPERM);
2891
2892                rc = echo_get_object(&eco, ed, oa);
2893                if (rc == 0) {
2894                        rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
2895                                         &dummy_oti, NULL, NULL);
2896                        if (rc == 0)
2897                                eco->eo_deleted = 1;
2898                        echo_put_object(eco);
2899                }
2900                GOTO(out, rc);
2901
2902        case OBD_IOC_GETATTR:
2903                rc = echo_get_object(&eco, ed, oa);
2904                if (rc == 0) {
2905                        struct obd_info oinfo = { { { 0 } } };
2906                        oinfo.oi_md = eco->eo_lsm;
2907                        oinfo.oi_oa = oa;
2908                        rc = obd_getattr(env, ec->ec_exp, &oinfo);
2909                        echo_put_object(eco);
2910                }
2911                GOTO(out, rc);
2912
2913        case OBD_IOC_SETATTR:
2914                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2915                        GOTO (out, rc = -EPERM);
2916
2917                rc = echo_get_object(&eco, ed, oa);
2918                if (rc == 0) {
2919                        struct obd_info oinfo = { { { 0 } } };
2920                        oinfo.oi_oa = oa;
2921                        oinfo.oi_md = eco->eo_lsm;
2922
2923                        rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
2924                        echo_put_object(eco);
2925                }
2926                GOTO(out, rc);
2927
2928        case OBD_IOC_BRW_WRITE:
2929                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2930                        GOTO (out, rc = -EPERM);
2931
2932                rw = OBD_BRW_WRITE;
2933                /* fall through */
2934        case OBD_IOC_BRW_READ:
2935                rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
2936                GOTO(out, rc);
2937
2938        case ECHO_IOC_GET_STRIPE:
2939                rc = echo_get_object(&eco, ed, oa);
2940                if (rc == 0) {
2941                        rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
2942                                              data->ioc_plen1);
2943                        echo_put_object(eco);
2944                }
2945                GOTO(out, rc);
2946
2947        case ECHO_IOC_SET_STRIPE:
2948                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2949                        GOTO (out, rc = -EPERM);
2950
2951                if (data->ioc_pbuf1 == NULL) {  /* unset */
2952                        rc = echo_get_object(&eco, ed, oa);
2953                        if (rc == 0) {
2954                                eco->eo_deleted = 1;
2955                                echo_put_object(eco);
2956                        }
2957                } else {
2958                        rc = echo_create_object(env, ed, 0, oa,
2959                                                data->ioc_pbuf1,
2960                                                data->ioc_plen1, &dummy_oti);
2961                }
2962                GOTO (out, rc);
2963
2964        case ECHO_IOC_ENQUEUE:
2965                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2966                        GOTO (out, rc = -EPERM);
2967
2968                rc = echo_client_enqueue(exp, oa,
2969                                         data->ioc_conn1, /* lock mode */
2970                                         data->ioc_offset,
2971                                         data->ioc_count);/*extent*/
2972                GOTO (out, rc);
2973
2974        case ECHO_IOC_CANCEL:
2975                rc = echo_client_cancel(exp, oa);
2976                GOTO (out, rc);
2977
2978        default:
2979                CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2980                GOTO (out, rc = -ENOTTY);
2981        }
2982
2983        EXIT;
2984out:
2985        lu_env_fini(env);
2986        OBD_FREE_PTR(env);
2987
2988        /* XXX this should be in a helper also called by target_send_reply */
2989        for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2990             i++, ack_lock++) {
2991                if (!ack_lock->mode)
2992                        break;
2993                ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2994        }
2995
2996        return rc;
2997}
2998
2999static int echo_client_setup(const struct lu_env *env,
3000                             struct obd_device *obddev, struct lustre_cfg *lcfg)
3001{
3002        struct echo_client_obd *ec = &obddev->u.echo_client;
3003        struct obd_device *tgt;
3004        struct obd_uuid echo_uuid = { "ECHO_UUID" };
3005        struct obd_connect_data *ocd = NULL;
3006        int rc;
3007        ENTRY;
3008
3009        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
3010                CERROR("requires a TARGET OBD name\n");
3011                RETURN(-EINVAL);
3012        }
3013
3014        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
3015        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
3016                CERROR("device not attached or not set up (%s)\n",
3017                       lustre_cfg_string(lcfg, 1));
3018                RETURN(-EINVAL);
3019        }
3020
3021        spin_lock_init(&ec->ec_lock);
3022        INIT_LIST_HEAD (&ec->ec_objects);
3023        INIT_LIST_HEAD (&ec->ec_locks);
3024        ec->ec_unique = 0;
3025        ec->ec_nstripes = 0;
3026
3027        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
3028                lu_context_tags_update(ECHO_MD_CTX_TAG);
3029                lu_session_tags_update(ECHO_MD_SES_TAG);
3030                RETURN(0);
3031        }
3032
3033        OBD_ALLOC(ocd, sizeof(*ocd));
3034        if (ocd == NULL) {
3035                CERROR("Can't alloc ocd connecting to %s\n",
3036                       lustre_cfg_string(lcfg, 1));
3037                return -ENOMEM;
3038        }
3039
3040        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
3041                                 OBD_CONNECT_BRW_SIZE |
3042                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
3043                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
3044                                 OBD_CONNECT_FID;
3045        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
3046        ocd->ocd_version = LUSTRE_VERSION_CODE;
3047        ocd->ocd_group = FID_SEQ_ECHO;
3048
3049        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
3050        if (rc == 0) {
3051                /* Turn off pinger because it connects to tgt obd directly. */
3052                spin_lock(&tgt->obd_dev_lock);
3053                list_del_init(&ec->ec_exp->exp_obd_chain_timed);
3054                spin_unlock(&tgt->obd_dev_lock);
3055        }
3056
3057        OBD_FREE(ocd, sizeof(*ocd));
3058
3059        if (rc != 0) {
3060                CERROR("fail to connect to device %s\n",
3061                       lustre_cfg_string(lcfg, 1));
3062                return (rc);
3063        }
3064
3065        RETURN(rc);
3066}
3067
3068static int echo_client_cleanup(struct obd_device *obddev)
3069{
3070        struct echo_device *ed = obd2echo_dev(obddev);
3071        struct echo_client_obd *ec = &obddev->u.echo_client;
3072        int rc;
3073        ENTRY;
3074
3075        /*Do nothing for Metadata echo client*/
3076        if (ed == NULL )
3077                RETURN(0);
3078
3079        if (ed->ed_next_ismd) {
3080                lu_context_tags_clear(ECHO_MD_CTX_TAG);
3081                lu_session_tags_clear(ECHO_MD_SES_TAG);
3082                RETURN(0);
3083        }
3084
3085        if (!list_empty(&obddev->obd_exports)) {
3086                CERROR("still has clients!\n");
3087                RETURN(-EBUSY);
3088        }
3089
3090        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
3091        rc = obd_disconnect(ec->ec_exp);
3092        if (rc != 0)
3093                CERROR("fail to disconnect device: %d\n", rc);
3094
3095        RETURN(rc);
3096}
3097
3098static int echo_client_connect(const struct lu_env *env,
3099                               struct obd_export **exp,
3100                               struct obd_device *src, struct obd_uuid *cluuid,
3101                               struct obd_connect_data *data, void *localdata)
3102{
3103        int             rc;
3104        struct lustre_handle conn = { 0 };
3105
3106        ENTRY;
3107        rc = class_connect(&conn, src, cluuid);
3108        if (rc == 0) {
3109                *exp = class_conn2export(&conn);
3110        }
3111
3112        RETURN (rc);
3113}
3114
3115static int echo_client_disconnect(struct obd_export *exp)
3116{
3117#if 0
3118        struct obd_device      *obd;
3119        struct echo_client_obd *ec;
3120        struct ec_lock   *ecl;
3121#endif
3122        int                  rc;
3123        ENTRY;
3124
3125        if (exp == NULL)
3126                GOTO(out, rc = -EINVAL);
3127
3128#if 0
3129        obd = exp->exp_obd;
3130        ec = &obd->u.echo_client;
3131
3132        /* no more contention on export's lock list */
3133        while (!list_empty (&exp->exp_ec_data.eced_locks)) {
3134                ecl = list_entry (exp->exp_ec_data.eced_locks.next,
3135                                      struct ec_lock, ecl_exp_chain);
3136                list_del (&ecl->ecl_exp_chain);
3137
3138                rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
3139                                 ecl->ecl_mode, &ecl->ecl_lock_handle);
3140
3141                CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
3142                        "(%d)\n", ecl->ecl_object->eco_id, rc);
3143
3144                echo_put_object (ecl->ecl_object);
3145                OBD_FREE (ecl, sizeof (*ecl));
3146        }
3147#endif
3148
3149        rc = class_disconnect(exp);
3150        GOTO(out, rc);
3151 out:
3152        return rc;
3153}
3154
3155static struct obd_ops echo_client_obd_ops = {
3156        .o_owner       = THIS_MODULE,
3157
3158#if 0
3159        .o_setup       = echo_client_setup,
3160        .o_cleanup     = echo_client_cleanup,
3161#endif
3162
3163        .o_iocontrol   = echo_client_iocontrol,
3164        .o_connect     = echo_client_connect,
3165        .o_disconnect  = echo_client_disconnect
3166};
3167
3168int echo_client_init(void)
3169{
3170        struct lprocfs_static_vars lvars = { 0 };
3171        int rc;
3172
3173        lprocfs_echo_init_vars(&lvars);
3174
3175        rc = lu_kmem_init(echo_caches);
3176        if (rc == 0) {
3177                rc = class_register_type(&echo_client_obd_ops, NULL,
3178                                         lvars.module_vars,
3179                                         LUSTRE_ECHO_CLIENT_NAME,
3180                                         &echo_device_type);
3181                if (rc)
3182                        lu_kmem_fini(echo_caches);
3183        }
3184        return rc;
3185}
3186
3187void echo_client_exit(void)
3188{
3189        class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
3190        lu_kmem_fini(echo_caches);
3191}
3192
3193static int __init obdecho_init(void)
3194{
3195        struct lprocfs_static_vars lvars;
3196        int rc;
3197
3198        ENTRY;
3199        LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
3200
3201        LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
3202
3203        lprocfs_echo_init_vars(&lvars);
3204
3205
3206        rc = echo_client_init();
3207
3208        RETURN(rc);
3209}
3210
3211static void /*__exit*/ obdecho_exit(void)
3212{
3213        echo_client_exit();
3214
3215}
3216
3217MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3218MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
3219MODULE_LICENSE("GPL");
3220
3221cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
3222
3223/** @} echo_client */
3224