linux/drivers/staging/lustre/lustre/osc/osc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_OSC
  38
  39#include <linux/libcfs/libcfs.h>
  40
  41
  42#include <lustre_dlm.h>
  43#include <lustre_net.h>
  44#include <lustre/lustre_user.h>
  45#include <obd_cksum.h>
  46#include <obd_ost.h>
  47#include <obd_lov.h>
  48
  49#ifdef  __CYGWIN__
  50# include <ctype.h>
  51#endif
  52
  53#include <lustre_ha.h>
  54#include <lprocfs_status.h>
  55#include <lustre_log.h>
  56#include <lustre_debug.h>
  57#include <lustre_param.h>
  58#include <lustre_fid.h>
  59#include "osc_internal.h"
  60#include "osc_cl_internal.h"
  61
  62static void osc_release_ppga(struct brw_page **ppga, obd_count count);
  63static int brw_interpret(const struct lu_env *env,
  64                         struct ptlrpc_request *req, void *data, int rc);
  65int osc_cleanup(struct obd_device *obd);
  66
  67/* Pack OSC object metadata for disk storage (LE byte order). */
  68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
  69                      struct lov_stripe_md *lsm)
  70{
  71        int lmm_size;
  72
  73        lmm_size = sizeof(**lmmp);
  74        if (lmmp == NULL)
  75                return lmm_size;
  76
  77        if (*lmmp != NULL && lsm == NULL) {
  78                OBD_FREE(*lmmp, lmm_size);
  79                *lmmp = NULL;
  80                return 0;
  81        } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
  82                return -EBADF;
  83        }
  84
  85        if (*lmmp == NULL) {
  86                OBD_ALLOC(*lmmp, lmm_size);
  87                if (*lmmp == NULL)
  88                        return -ENOMEM;
  89        }
  90
  91        if (lsm)
  92                ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
  93
  94        return lmm_size;
  95}
  96
  97/* Unpack OSC object metadata from disk storage (LE byte order). */
  98static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
  99                        struct lov_mds_md *lmm, int lmm_bytes)
 100{
 101        int lsm_size;
 102        struct obd_import *imp = class_exp2cliimp(exp);
 103
 104        if (lmm != NULL) {
 105                if (lmm_bytes < sizeof(*lmm)) {
 106                        CERROR("%s: lov_mds_md too small: %d, need %d\n",
 107                               exp->exp_obd->obd_name, lmm_bytes,
 108                               (int)sizeof(*lmm));
 109                        return -EINVAL;
 110                }
 111                /* XXX LOV_MAGIC etc check? */
 112
 113                if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
 114                        CERROR("%s: zero lmm_object_id: rc = %d\n",
 115                               exp->exp_obd->obd_name, -EINVAL);
 116                        return -EINVAL;
 117                }
 118        }
 119
 120        lsm_size = lov_stripe_md_size(1);
 121        if (lsmp == NULL)
 122                return lsm_size;
 123
 124        if (*lsmp != NULL && lmm == NULL) {
 125                OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 126                OBD_FREE(*lsmp, lsm_size);
 127                *lsmp = NULL;
 128                return 0;
 129        }
 130
 131        if (*lsmp == NULL) {
 132                OBD_ALLOC(*lsmp, lsm_size);
 133                if (unlikely(*lsmp == NULL))
 134                        return -ENOMEM;
 135                OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 136                if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
 137                        OBD_FREE(*lsmp, lsm_size);
 138                        return -ENOMEM;
 139                }
 140                loi_init((*lsmp)->lsm_oinfo[0]);
 141        } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
 142                return -EBADF;
 143        }
 144
 145        if (lmm != NULL)
 146                /* XXX zero *lsmp? */
 147                ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
 148
 149        if (imp != NULL &&
 150            (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
 151                (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
 152        else
 153                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 154
 155        return lsm_size;
 156}
 157
 158static inline void osc_pack_capa(struct ptlrpc_request *req,
 159                                 struct ost_body *body, void *capa)
 160{
 161        struct obd_capa *oc = (struct obd_capa *)capa;
 162        struct lustre_capa *c;
 163
 164        if (!capa)
 165                return;
 166
 167        c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
 168        LASSERT(c);
 169        capa_cpy(c, oc);
 170        body->oa.o_valid |= OBD_MD_FLOSSCAPA;
 171        DEBUG_CAPA(D_SEC, c, "pack");
 172}
 173
 174static inline void osc_pack_req_body(struct ptlrpc_request *req,
 175                                     struct obd_info *oinfo)
 176{
 177        struct ost_body *body;
 178
 179        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 180        LASSERT(body);
 181
 182        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 183                             oinfo->oi_oa);
 184        osc_pack_capa(req, body, oinfo->oi_capa);
 185}
 186
 187static inline void osc_set_capa_size(struct ptlrpc_request *req,
 188                                     const struct req_msg_field *field,
 189                                     struct obd_capa *oc)
 190{
 191        if (oc == NULL)
 192                req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
 193        else
 194                /* it is already calculated as sizeof struct obd_capa */
 195                ;
 196}
 197
 198static int osc_getattr_interpret(const struct lu_env *env,
 199                                 struct ptlrpc_request *req,
 200                                 struct osc_async_args *aa, int rc)
 201{
 202        struct ost_body *body;
 203
 204        if (rc != 0)
 205                GOTO(out, rc);
 206
 207        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 208        if (body) {
 209                CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
 210                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
 211                                     aa->aa_oi->oi_oa, &body->oa);
 212
 213                /* This should really be sent by the OST */
 214                aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
 215                aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
 216        } else {
 217                CDEBUG(D_INFO, "can't unpack ost_body\n");
 218                rc = -EPROTO;
 219                aa->aa_oi->oi_oa->o_valid = 0;
 220        }
 221out:
 222        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
 223        return rc;
 224}
 225
 226static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
 227                             struct ptlrpc_request_set *set)
 228{
 229        struct ptlrpc_request *req;
 230        struct osc_async_args *aa;
 231        int                 rc;
 232
 233        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
 234        if (req == NULL)
 235                return -ENOMEM;
 236
 237        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 238        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
 239        if (rc) {
 240                ptlrpc_request_free(req);
 241                return rc;
 242        }
 243
 244        osc_pack_req_body(req, oinfo);
 245
 246        ptlrpc_request_set_replen(req);
 247        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
 248
 249        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
 250        aa = ptlrpc_req_async_args(req);
 251        aa->aa_oi = oinfo;
 252
 253        ptlrpc_set_add_req(set, req);
 254        return 0;
 255}
 256
 257static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
 258                       struct obd_info *oinfo)
 259{
 260        struct ptlrpc_request *req;
 261        struct ost_body       *body;
 262        int                 rc;
 263
 264        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
 265        if (req == NULL)
 266                return -ENOMEM;
 267
 268        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 269        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
 270        if (rc) {
 271                ptlrpc_request_free(req);
 272                return rc;
 273        }
 274
 275        osc_pack_req_body(req, oinfo);
 276
 277        ptlrpc_request_set_replen(req);
 278
 279        rc = ptlrpc_queue_wait(req);
 280        if (rc)
 281                GOTO(out, rc);
 282
 283        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 284        if (body == NULL)
 285                GOTO(out, rc = -EPROTO);
 286
 287        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
 288        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
 289                             &body->oa);
 290
 291        oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
 292        oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
 293
 294 out:
 295        ptlrpc_req_finished(req);
 296        return rc;
 297}
 298
 299static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
 300                       struct obd_info *oinfo, struct obd_trans_info *oti)
 301{
 302        struct ptlrpc_request *req;
 303        struct ost_body       *body;
 304        int                 rc;
 305
 306        LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
 307
 308        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
 309        if (req == NULL)
 310                return -ENOMEM;
 311
 312        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 313        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
 314        if (rc) {
 315                ptlrpc_request_free(req);
 316                return rc;
 317        }
 318
 319        osc_pack_req_body(req, oinfo);
 320
 321        ptlrpc_request_set_replen(req);
 322
 323        rc = ptlrpc_queue_wait(req);
 324        if (rc)
 325                GOTO(out, rc);
 326
 327        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 328        if (body == NULL)
 329                GOTO(out, rc = -EPROTO);
 330
 331        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
 332                             &body->oa);
 333
 334out:
 335        ptlrpc_req_finished(req);
 336        return rc;
 337}
 338
 339static int osc_setattr_interpret(const struct lu_env *env,
 340                                 struct ptlrpc_request *req,
 341                                 struct osc_setattr_args *sa, int rc)
 342{
 343        struct ost_body *body;
 344
 345        if (rc != 0)
 346                GOTO(out, rc);
 347
 348        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 349        if (body == NULL)
 350                GOTO(out, rc = -EPROTO);
 351
 352        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
 353                             &body->oa);
 354out:
 355        rc = sa->sa_upcall(sa->sa_cookie, rc);
 356        return rc;
 357}
 358
 359int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
 360                           struct obd_trans_info *oti,
 361                           obd_enqueue_update_f upcall, void *cookie,
 362                           struct ptlrpc_request_set *rqset)
 363{
 364        struct ptlrpc_request   *req;
 365        struct osc_setattr_args *sa;
 366        int                   rc;
 367
 368        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
 369        if (req == NULL)
 370                return -ENOMEM;
 371
 372        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 373        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
 374        if (rc) {
 375                ptlrpc_request_free(req);
 376                return rc;
 377        }
 378
 379        if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
 380                oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
 381
 382        osc_pack_req_body(req, oinfo);
 383
 384        ptlrpc_request_set_replen(req);
 385
 386        /* do mds to ost setattr asynchronously */
 387        if (!rqset) {
 388                /* Do not wait for response. */
 389                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 390        } else {
 391                req->rq_interpret_reply =
 392                        (ptlrpc_interpterer_t)osc_setattr_interpret;
 393
 394                CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
 395                sa = ptlrpc_req_async_args(req);
 396                sa->sa_oa = oinfo->oi_oa;
 397                sa->sa_upcall = upcall;
 398                sa->sa_cookie = cookie;
 399
 400                if (rqset == PTLRPCD_SET)
 401                        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 402                else
 403                        ptlrpc_set_add_req(rqset, req);
 404        }
 405
 406        return 0;
 407}
 408
 409static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
 410                             struct obd_trans_info *oti,
 411                             struct ptlrpc_request_set *rqset)
 412{
 413        return osc_setattr_async_base(exp, oinfo, oti,
 414                                      oinfo->oi_cb_up, oinfo, rqset);
 415}
 416
 417int osc_real_create(struct obd_export *exp, struct obdo *oa,
 418                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
 419{
 420        struct ptlrpc_request *req;
 421        struct ost_body       *body;
 422        struct lov_stripe_md  *lsm;
 423        int                 rc;
 424
 425        LASSERT(oa);
 426        LASSERT(ea);
 427
 428        lsm = *ea;
 429        if (!lsm) {
 430                rc = obd_alloc_memmd(exp, &lsm);
 431                if (rc < 0)
 432                        return rc;
 433        }
 434
 435        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
 436        if (req == NULL)
 437                GOTO(out, rc = -ENOMEM);
 438
 439        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
 440        if (rc) {
 441                ptlrpc_request_free(req);
 442                GOTO(out, rc);
 443        }
 444
 445        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 446        LASSERT(body);
 447
 448        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 449
 450        ptlrpc_request_set_replen(req);
 451
 452        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
 453            oa->o_flags == OBD_FL_DELORPHAN) {
 454                DEBUG_REQ(D_HA, req,
 455                          "delorphan from OST integration");
 456                /* Don't resend the delorphan req */
 457                req->rq_no_resend = req->rq_no_delay = 1;
 458        }
 459
 460        rc = ptlrpc_queue_wait(req);
 461        if (rc)
 462                GOTO(out_req, rc);
 463
 464        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 465        if (body == NULL)
 466                GOTO(out_req, rc = -EPROTO);
 467
 468        CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
 469        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
 470
 471        oa->o_blksize = cli_brw_size(exp->exp_obd);
 472        oa->o_valid |= OBD_MD_FLBLKSZ;
 473
 474        /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
 475         * have valid lsm_oinfo data structs, so don't go touching that.
 476         * This needs to be fixed in a big way.
 477         */
 478        lsm->lsm_oi = oa->o_oi;
 479        *ea = lsm;
 480
 481        if (oti != NULL) {
 482                oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
 483
 484                if (oa->o_valid & OBD_MD_FLCOOKIE) {
 485                        if (!oti->oti_logcookies)
 486                                oti_alloc_cookies(oti, 1);
 487                        *oti->oti_logcookies = oa->o_lcookie;
 488                }
 489        }
 490
 491        CDEBUG(D_HA, "transno: "LPD64"\n",
 492               lustre_msg_get_transno(req->rq_repmsg));
 493out_req:
 494        ptlrpc_req_finished(req);
 495out:
 496        if (rc && !*ea)
 497                obd_free_memmd(exp, &lsm);
 498        return rc;
 499}
 500
 501int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
 502                   obd_enqueue_update_f upcall, void *cookie,
 503                   struct ptlrpc_request_set *rqset)
 504{
 505        struct ptlrpc_request   *req;
 506        struct osc_setattr_args *sa;
 507        struct ost_body  *body;
 508        int                   rc;
 509
 510        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
 511        if (req == NULL)
 512                return -ENOMEM;
 513
 514        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 515        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
 516        if (rc) {
 517                ptlrpc_request_free(req);
 518                return rc;
 519        }
 520        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 521        ptlrpc_at_set_req_timeout(req);
 522
 523        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 524        LASSERT(body);
 525        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 526                             oinfo->oi_oa);
 527        osc_pack_capa(req, body, oinfo->oi_capa);
 528
 529        ptlrpc_request_set_replen(req);
 530
 531        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
 532        CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
 533        sa = ptlrpc_req_async_args(req);
 534        sa->sa_oa     = oinfo->oi_oa;
 535        sa->sa_upcall = upcall;
 536        sa->sa_cookie = cookie;
 537        if (rqset == PTLRPCD_SET)
 538                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 539        else
 540                ptlrpc_set_add_req(rqset, req);
 541
 542        return 0;
 543}
 544
 545static int osc_punch(const struct lu_env *env, struct obd_export *exp,
 546                     struct obd_info *oinfo, struct obd_trans_info *oti,
 547                     struct ptlrpc_request_set *rqset)
 548{
 549        oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
 550        oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
 551        oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
 552        return osc_punch_base(exp, oinfo,
 553                              oinfo->oi_cb_up, oinfo, rqset);
 554}
 555
 556static int osc_sync_interpret(const struct lu_env *env,
 557                              struct ptlrpc_request *req,
 558                              void *arg, int rc)
 559{
 560        struct osc_fsync_args *fa = arg;
 561        struct ost_body *body;
 562
 563        if (rc)
 564                GOTO(out, rc);
 565
 566        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 567        if (body == NULL) {
 568                CERROR ("can't unpack ost_body\n");
 569                GOTO(out, rc = -EPROTO);
 570        }
 571
 572        *fa->fa_oi->oi_oa = body->oa;
 573out:
 574        rc = fa->fa_upcall(fa->fa_cookie, rc);
 575        return rc;
 576}
 577
 578int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
 579                  obd_enqueue_update_f upcall, void *cookie,
 580                  struct ptlrpc_request_set *rqset)
 581{
 582        struct ptlrpc_request *req;
 583        struct ost_body       *body;
 584        struct osc_fsync_args *fa;
 585        int                 rc;
 586
 587        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
 588        if (req == NULL)
 589                return -ENOMEM;
 590
 591        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 592        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
 593        if (rc) {
 594                ptlrpc_request_free(req);
 595                return rc;
 596        }
 597
 598        /* overload the size and blocks fields in the oa with start/end */
 599        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 600        LASSERT(body);
 601        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 602                             oinfo->oi_oa);
 603        osc_pack_capa(req, body, oinfo->oi_capa);
 604
 605        ptlrpc_request_set_replen(req);
 606        req->rq_interpret_reply = osc_sync_interpret;
 607
 608        CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
 609        fa = ptlrpc_req_async_args(req);
 610        fa->fa_oi = oinfo;
 611        fa->fa_upcall = upcall;
 612        fa->fa_cookie = cookie;
 613
 614        if (rqset == PTLRPCD_SET)
 615                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 616        else
 617                ptlrpc_set_add_req(rqset, req);
 618
 619        return 0;
 620}
 621
 622static int osc_sync(const struct lu_env *env, struct obd_export *exp,
 623                    struct obd_info *oinfo, obd_size start, obd_size end,
 624                    struct ptlrpc_request_set *set)
 625{
 626        if (!oinfo->oi_oa) {
 627                CDEBUG(D_INFO, "oa NULL\n");
 628                return -EINVAL;
 629        }
 630
 631        oinfo->oi_oa->o_size = start;
 632        oinfo->oi_oa->o_blocks = end;
 633        oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 634
 635        return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
 636}
 637
 638/* Find and cancel locally locks matched by @mode in the resource found by
 639 * @objid. Found locks are added into @cancel list. Returns the amount of
 640 * locks added to @cancels list. */
 641static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
 642                                   struct list_head *cancels,
 643                                   ldlm_mode_t mode, int lock_flags)
 644{
 645        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
 646        struct ldlm_res_id res_id;
 647        struct ldlm_resource *res;
 648        int count;
 649
 650        /* Return, i.e. cancel nothing, only if ELC is supported (flag in
 651         * export) but disabled through procfs (flag in NS).
 652         *
 653         * This distinguishes from a case when ELC is not supported originally,
 654         * when we still want to cancel locks in advance and just cancel them
 655         * locally, without sending any RPC. */
 656        if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
 657                return 0;
 658
 659        ostid_build_res_name(&oa->o_oi, &res_id);
 660        res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
 661        if (res == NULL)
 662                return 0;
 663
 664        LDLM_RESOURCE_ADDREF(res);
 665        count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
 666                                           lock_flags, 0, NULL);
 667        LDLM_RESOURCE_DELREF(res);
 668        ldlm_resource_putref(res);
 669        return count;
 670}
 671
 672static int osc_destroy_interpret(const struct lu_env *env,
 673                                 struct ptlrpc_request *req, void *data,
 674                                 int rc)
 675{
 676        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
 677
 678        atomic_dec(&cli->cl_destroy_in_flight);
 679        wake_up(&cli->cl_destroy_waitq);
 680        return 0;
 681}
 682
 683static int osc_can_send_destroy(struct client_obd *cli)
 684{
 685        if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
 686            cli->cl_max_rpcs_in_flight) {
 687                /* The destroy request can be sent */
 688                return 1;
 689        }
 690        if (atomic_dec_return(&cli->cl_destroy_in_flight) <
 691            cli->cl_max_rpcs_in_flight) {
 692                /*
 693                 * The counter has been modified between the two atomic
 694                 * operations.
 695                 */
 696                wake_up(&cli->cl_destroy_waitq);
 697        }
 698        return 0;
 699}
 700
 701int osc_create(const struct lu_env *env, struct obd_export *exp,
 702               struct obdo *oa, struct lov_stripe_md **ea,
 703               struct obd_trans_info *oti)
 704{
 705        int rc = 0;
 706
 707        LASSERT(oa);
 708        LASSERT(ea);
 709        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 710
 711        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
 712            oa->o_flags == OBD_FL_RECREATE_OBJS) {
 713                return osc_real_create(exp, oa, ea, oti);
 714        }
 715
 716        if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
 717                return osc_real_create(exp, oa, ea, oti);
 718
 719        /* we should not get here anymore */
 720        LBUG();
 721
 722        return rc;
 723}
 724
 725/* Destroy requests can be async always on the client, and we don't even really
 726 * care about the return code since the client cannot do anything at all about
 727 * a destroy failure.
 728 * When the MDS is unlinking a filename, it saves the file objects into a
 729 * recovery llog, and these object records are cancelled when the OST reports
 730 * they were destroyed and sync'd to disk (i.e. transaction committed).
 731 * If the client dies, or the OST is down when the object should be destroyed,
 732 * the records are not cancelled, and when the OST reconnects to the MDS next,
 733 * it will retrieve the llog unlink logs and then sends the log cancellation
 734 * cookies to the MDS after committing destroy transactions. */
 735static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
 736                       struct obdo *oa, struct lov_stripe_md *ea,
 737                       struct obd_trans_info *oti, struct obd_export *md_export,
 738                       void *capa)
 739{
 740        struct client_obd     *cli = &exp->exp_obd->u.cli;
 741        struct ptlrpc_request *req;
 742        struct ost_body       *body;
 743        LIST_HEAD(cancels);
 744        int rc, count;
 745
 746        if (!oa) {
 747                CDEBUG(D_INFO, "oa NULL\n");
 748                return -EINVAL;
 749        }
 750
 751        count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
 752                                        LDLM_FL_DISCARD_DATA);
 753
 754        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
 755        if (req == NULL) {
 756                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 757                return -ENOMEM;
 758        }
 759
 760        osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
 761        rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
 762                               0, &cancels, count);
 763        if (rc) {
 764                ptlrpc_request_free(req);
 765                return rc;
 766        }
 767
 768        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 769        ptlrpc_at_set_req_timeout(req);
 770
 771        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
 772                oa->o_lcookie = *oti->oti_logcookies;
 773        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 774        LASSERT(body);
 775        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 776
 777        osc_pack_capa(req, body, (struct obd_capa *)capa);
 778        ptlrpc_request_set_replen(req);
 779
 780        /* If osc_destory is for destroying the unlink orphan,
 781         * sent from MDT to OST, which should not be blocked here,
 782         * because the process might be triggered by ptlrpcd, and
 783         * it is not good to block ptlrpcd thread (b=16006)*/
 784        if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
 785                req->rq_interpret_reply = osc_destroy_interpret;
 786                if (!osc_can_send_destroy(cli)) {
 787                        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
 788                                                          NULL);
 789
 790                        /*
 791                         * Wait until the number of on-going destroy RPCs drops
 792                         * under max_rpc_in_flight
 793                         */
 794                        l_wait_event_exclusive(cli->cl_destroy_waitq,
 795                                               osc_can_send_destroy(cli), &lwi);
 796                }
 797        }
 798
 799        /* Do not wait for response */
 800        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 801        return 0;
 802}
 803
 804static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 805                                long writing_bytes)
 806{
 807        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
 808
 809        LASSERT(!(oa->o_valid & bits));
 810
 811        oa->o_valid |= bits;
 812        client_obd_list_lock(&cli->cl_loi_list_lock);
 813        oa->o_dirty = cli->cl_dirty;
 814        if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
 815                     cli->cl_dirty_max)) {
 816                CERROR("dirty %lu - %lu > dirty_max %lu\n",
 817                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
 818                oa->o_undirty = 0;
 819        } else if (unlikely(atomic_read(&obd_dirty_pages) -
 820                            atomic_read(&obd_dirty_transit_pages) >
 821                            (long)(obd_max_dirty_pages + 1))) {
 822                /* The atomic_read() allowing the atomic_inc() are
 823                 * not covered by a lock thus they may safely race and trip
 824                 * this CERROR() unless we add in a small fudge factor (+1). */
 825                CERROR("dirty %d - %d > system dirty_max %d\n",
 826                       atomic_read(&obd_dirty_pages),
 827                       atomic_read(&obd_dirty_transit_pages),
 828                       obd_max_dirty_pages);
 829                oa->o_undirty = 0;
 830        } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
 831                CERROR("dirty %lu - dirty_max %lu too big???\n",
 832                       cli->cl_dirty, cli->cl_dirty_max);
 833                oa->o_undirty = 0;
 834        } else {
 835                long max_in_flight = (cli->cl_max_pages_per_rpc <<
 836                                      PAGE_CACHE_SHIFT)*
 837                                     (cli->cl_max_rpcs_in_flight + 1);
 838                oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
 839        }
 840        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
 841        oa->o_dropped = cli->cl_lost_grant;
 842        cli->cl_lost_grant = 0;
 843        client_obd_list_unlock(&cli->cl_loi_list_lock);
 844        CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
 845               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
 846
 847}
 848
 849void osc_update_next_shrink(struct client_obd *cli)
 850{
 851        cli->cl_next_shrink_grant =
 852                cfs_time_shift(cli->cl_grant_shrink_interval);
 853        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
 854               cli->cl_next_shrink_grant);
 855}
 856
 857static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 858{
 859        client_obd_list_lock(&cli->cl_loi_list_lock);
 860        cli->cl_avail_grant += grant;
 861        client_obd_list_unlock(&cli->cl_loi_list_lock);
 862}
 863
 864static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 865{
 866        if (body->oa.o_valid & OBD_MD_FLGRANT) {
 867                CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
 868                __osc_update_grant(cli, body->oa.o_grant);
 869        }
 870}
 871
 872static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 873                              obd_count keylen, void *key, obd_count vallen,
 874                              void *val, struct ptlrpc_request_set *set);
 875
 876static int osc_shrink_grant_interpret(const struct lu_env *env,
 877                                      struct ptlrpc_request *req,
 878                                      void *aa, int rc)
 879{
 880        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
 881        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
 882        struct ost_body *body;
 883
 884        if (rc != 0) {
 885                __osc_update_grant(cli, oa->o_grant);
 886                GOTO(out, rc);
 887        }
 888
 889        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 890        LASSERT(body);
 891        osc_update_grant(cli, body);
 892out:
 893        OBDO_FREE(oa);
 894        return rc;
 895}
 896
 897static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 898{
 899        client_obd_list_lock(&cli->cl_loi_list_lock);
 900        oa->o_grant = cli->cl_avail_grant / 4;
 901        cli->cl_avail_grant -= oa->o_grant;
 902        client_obd_list_unlock(&cli->cl_loi_list_lock);
 903        if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
 904                oa->o_valid |= OBD_MD_FLFLAGS;
 905                oa->o_flags = 0;
 906        }
 907        oa->o_flags |= OBD_FL_SHRINK_GRANT;
 908        osc_update_next_shrink(cli);
 909}
 910
 911/* Shrink the current grant, either from some large amount to enough for a
 912 * full set of in-flight RPCs, or if we have already shrunk to that limit
 913 * then to enough for a single RPC.  This avoids keeping more grant than
 914 * needed, and avoids shrinking the grant piecemeal. */
 915static int osc_shrink_grant(struct client_obd *cli)
 916{
 917        __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
 918                             (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
 919
 920        client_obd_list_lock(&cli->cl_loi_list_lock);
 921        if (cli->cl_avail_grant <= target_bytes)
 922                target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 923        client_obd_list_unlock(&cli->cl_loi_list_lock);
 924
 925        return osc_shrink_grant_to_target(cli, target_bytes);
 926}
 927
 928int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
 929{
 930        int                     rc = 0;
 931        struct ost_body *body;
 932
 933        client_obd_list_lock(&cli->cl_loi_list_lock);
 934        /* Don't shrink if we are already above or below the desired limit
 935         * We don't want to shrink below a single RPC, as that will negatively
 936         * impact block allocation and long-term performance. */
 937        if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
 938                target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 939
 940        if (target_bytes >= cli->cl_avail_grant) {
 941                client_obd_list_unlock(&cli->cl_loi_list_lock);
 942                return 0;
 943        }
 944        client_obd_list_unlock(&cli->cl_loi_list_lock);
 945
 946        OBD_ALLOC_PTR(body);
 947        if (!body)
 948                return -ENOMEM;
 949
 950        osc_announce_cached(cli, &body->oa, 0);
 951
 952        client_obd_list_lock(&cli->cl_loi_list_lock);
 953        body->oa.o_grant = cli->cl_avail_grant - target_bytes;
 954        cli->cl_avail_grant = target_bytes;
 955        client_obd_list_unlock(&cli->cl_loi_list_lock);
 956        if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
 957                body->oa.o_valid |= OBD_MD_FLFLAGS;
 958                body->oa.o_flags = 0;
 959        }
 960        body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
 961        osc_update_next_shrink(cli);
 962
 963        rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
 964                                sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
 965                                sizeof(*body), body, NULL);
 966        if (rc != 0)
 967                __osc_update_grant(cli, body->oa.o_grant);
 968        OBD_FREE_PTR(body);
 969        return rc;
 970}
 971
 972static int osc_should_shrink_grant(struct client_obd *client)
 973{
 974        cfs_time_t time = cfs_time_current();
 975        cfs_time_t next_shrink = client->cl_next_shrink_grant;
 976
 977        if ((client->cl_import->imp_connect_data.ocd_connect_flags &
 978             OBD_CONNECT_GRANT_SHRINK) == 0)
 979                return 0;
 980
 981        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
 982                /* Get the current RPC size directly, instead of going via:
 983                 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
 984                 * Keep comment here so that it can be found by searching. */
 985                int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 986
 987                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
 988                    client->cl_avail_grant > brw_size)
 989                        return 1;
 990                else
 991                        osc_update_next_shrink(client);
 992        }
 993        return 0;
 994}
 995
 996static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
 997{
 998        struct client_obd *client;
 999
1000        list_for_each_entry(client, &item->ti_obd_list,
1001                                cl_grant_shrink_list) {
1002                if (osc_should_shrink_grant(client))
1003                        osc_shrink_grant(client);
1004        }
1005        return 0;
1006}
1007
1008static int osc_add_shrink_grant(struct client_obd *client)
1009{
1010        int rc;
1011
1012        rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1013                                       TIMEOUT_GRANT,
1014                                       osc_grant_shrink_grant_cb, NULL,
1015                                       &client->cl_grant_shrink_list);
1016        if (rc) {
1017                CERROR("add grant client %s error %d\n",
1018                        client->cl_import->imp_obd->obd_name, rc);
1019                return rc;
1020        }
1021        CDEBUG(D_CACHE, "add grant client %s \n",
1022               client->cl_import->imp_obd->obd_name);
1023        osc_update_next_shrink(client);
1024        return 0;
1025}
1026
1027static int osc_del_shrink_grant(struct client_obd *client)
1028{
1029        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1030                                         TIMEOUT_GRANT);
1031}
1032
1033static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1034{
1035        /*
1036         * ocd_grant is the total grant amount we're expect to hold: if we've
1037         * been evicted, it's the new avail_grant amount, cl_dirty will drop
1038         * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1039         *
1040         * race is tolerable here: if we're evicted, but imp_state already
1041         * left EVICTED state, then cl_dirty must be 0 already.
1042         */
1043        client_obd_list_lock(&cli->cl_loi_list_lock);
1044        if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1045                cli->cl_avail_grant = ocd->ocd_grant;
1046        else
1047                cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1048
1049        if (cli->cl_avail_grant < 0) {
1050                CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051                      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052                      ocd->ocd_grant, cli->cl_dirty);
1053                /* workaround for servers which do not have the patch from
1054                 * LU-2679 */
1055                cli->cl_avail_grant = ocd->ocd_grant;
1056        }
1057
1058        /* determine the appropriate chunk size used by osc_extent. */
1059        cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060        client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062        CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063                "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067            list_empty(&cli->cl_grant_shrink_list))
1068                osc_add_shrink_grant(cli);
1069}
1070
1071/* We assume that the reason this OSC got a short read is because it read
1072 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1073 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1074 * this stripe never got written at or beyond this stripe offset yet. */
1075static void handle_short_read(int nob_read, obd_count page_count,
1076                              struct brw_page **pga)
1077{
1078        char *ptr;
1079        int i = 0;
1080
1081        /* skip bytes read OK */
1082        while (nob_read > 0) {
1083                LASSERT (page_count > 0);
1084
1085                if (pga[i]->count > nob_read) {
1086                        /* EOF inside this page */
1087                        ptr = kmap(pga[i]->pg) +
1088                                (pga[i]->off & ~CFS_PAGE_MASK);
1089                        memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090                        kunmap(pga[i]->pg);
1091                        page_count--;
1092                        i++;
1093                        break;
1094                }
1095
1096                nob_read -= pga[i]->count;
1097                page_count--;
1098                i++;
1099        }
1100
1101        /* zero remaining pages */
1102        while (page_count-- > 0) {
1103                ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104                memset(ptr, 0, pga[i]->count);
1105                kunmap(pga[i]->pg);
1106                i++;
1107        }
1108}
1109
1110static int check_write_rcs(struct ptlrpc_request *req,
1111                           int requested_nob, int niocount,
1112                           obd_count page_count, struct brw_page **pga)
1113{
1114        int     i;
1115        __u32   *remote_rcs;
1116
1117        remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118                                                  sizeof(*remote_rcs) *
1119                                                  niocount);
1120        if (remote_rcs == NULL) {
1121                CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122                return(-EPROTO);
1123        }
1124
1125        /* return error if any niobuf was in error */
1126        for (i = 0; i < niocount; i++) {
1127                if ((int)remote_rcs[i] < 0)
1128                        return(remote_rcs[i]);
1129
1130                if (remote_rcs[i] != 0) {
1131                        CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132                                i, remote_rcs[i], req);
1133                        return(-EPROTO);
1134                }
1135        }
1136
1137        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138                CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139                       req->rq_bulk->bd_nob_transferred, requested_nob);
1140                return(-EPROTO);
1141        }
1142
1143        return (0);
1144}
1145
1146static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147{
1148        if (p1->flag != p2->flag) {
1149                unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1150                                  OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1151
1152                /* warn if we try to combine flags that we don't know to be
1153                 * safe to combine */
1154                if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1155                        CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1156                              "report this at http://bugs.whamcloud.com/\n",
1157                              p1->flag, p2->flag);
1158                }
1159                return 0;
1160        }
1161
1162        return (p1->off + p1->count == p2->off);
1163}
1164
1165static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1166                                   struct brw_page **pga, int opc,
1167                                   cksum_type_t cksum_type)
1168{
1169        __u32                           cksum;
1170        int                             i = 0;
1171        struct cfs_crypto_hash_desc     *hdesc;
1172        unsigned int                    bufsize;
1173        int                             err;
1174        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1175
1176        LASSERT(pg_count > 0);
1177
1178        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1179        if (IS_ERR(hdesc)) {
1180                CERROR("Unable to initialize checksum hash %s\n",
1181                       cfs_crypto_hash_name(cfs_alg));
1182                return PTR_ERR(hdesc);
1183        }
1184
1185        while (nob > 0 && pg_count > 0) {
1186                int count = pga[i]->count > nob ? nob : pga[i]->count;
1187
1188                /* corrupt the data before we compute the checksum, to
1189                 * simulate an OST->client data error */
1190                if (i == 0 && opc == OST_READ &&
1191                    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1192                        unsigned char *ptr = kmap(pga[i]->pg);
1193                        int off = pga[i]->off & ~CFS_PAGE_MASK;
1194                        memcpy(ptr + off, "bad1", min(4, nob));
1195                        kunmap(pga[i]->pg);
1196                }
1197                cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1198                                  pga[i]->off & ~CFS_PAGE_MASK,
1199                                  count);
1200                LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1201                               (int)(pga[i]->off & ~CFS_PAGE_MASK));
1202
1203                nob -= pga[i]->count;
1204                pg_count--;
1205                i++;
1206        }
1207
1208        bufsize = 4;
1209        err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1210
1211        if (err)
1212                cfs_crypto_hash_final(hdesc, NULL, NULL);
1213
1214        /* For sending we only compute the wrong checksum instead
1215         * of corrupting the data so it is still correct on a redo */
1216        if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1217                cksum++;
1218
1219        return cksum;
1220}
1221
1222static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1223                                struct lov_stripe_md *lsm, obd_count page_count,
1224                                struct brw_page **pga,
1225                                struct ptlrpc_request **reqp,
1226                                struct obd_capa *ocapa, int reserve,
1227                                int resend)
1228{
1229        struct ptlrpc_request   *req;
1230        struct ptlrpc_bulk_desc *desc;
1231        struct ost_body  *body;
1232        struct obd_ioobj        *ioobj;
1233        struct niobuf_remote    *niobuf;
1234        int niocount, i, requested_nob, opc, rc;
1235        struct osc_brw_async_args *aa;
1236        struct req_capsule      *pill;
1237        struct brw_page *pg_prev;
1238
1239        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240                return -ENOMEM; /* Recoverable */
1241        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242                return -EINVAL; /* Fatal */
1243
1244        if ((cmd & OBD_BRW_WRITE) != 0) {
1245                opc = OST_WRITE;
1246                req = ptlrpc_request_alloc_pool(cli->cl_import,
1247                                                cli->cl_import->imp_rq_pool,
1248                                                &RQF_OST_BRW_WRITE);
1249        } else {
1250                opc = OST_READ;
1251                req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252        }
1253        if (req == NULL)
1254                return -ENOMEM;
1255
1256        for (niocount = i = 1; i < page_count; i++) {
1257                if (!can_merge_pages(pga[i - 1], pga[i]))
1258                        niocount++;
1259        }
1260
1261        pill = &req->rq_pill;
1262        req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263                             sizeof(*ioobj));
1264        req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265                             niocount * sizeof(*niobuf));
1266        osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269        if (rc) {
1270                ptlrpc_request_free(req);
1271                return rc;
1272        }
1273        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1274        ptlrpc_at_set_req_timeout(req);
1275        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1276         * retry logic */
1277        req->rq_no_retry_einprogress = 1;
1278
1279        desc = ptlrpc_prep_bulk_imp(req, page_count,
1280                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281                opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282                OST_BULK_PORTAL);
1283
1284        if (desc == NULL)
1285                GOTO(out, rc = -ENOMEM);
1286        /* NB request now owns desc and will free it when it gets freed */
1287
1288        body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289        ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290        niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291        LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295        obdo_to_ioobj(oa, ioobj);
1296        ioobj->ioo_bufcnt = niocount;
1297        /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1298         * that might be send for this request.  The actual number is decided
1299         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1300         * "max - 1" for old client compatibility sending "0", and also so the
1301         * the actual maximum is a power-of-two number, not one less. LU-1431 */
1302        ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303        osc_pack_capa(req, body, ocapa);
1304        LASSERT(page_count > 0);
1305        pg_prev = pga[0];
1306        for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                struct brw_page *pg = pga[i];
1308                int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                LASSERT(pg->count > 0);
1311                /* make sure there is no gap in the middle of page array */
1312                LASSERTF(page_count == 1 ||
1313                         (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314                          ergo(i > 0 && i < page_count - 1,
1315                               poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1316                          ergo(i == page_count - 1, poff == 0)),
1317                         "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                         i, page_count, pg, pg->off, pg->count);
1319                LASSERTF(i == 0 || pg->off > pg_prev->off,
1320                         "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321                         " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322                         i, page_count,
1323                         pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324                         pg_prev->pg, page_private(pg_prev->pg),
1325                         pg_prev->pg->index, pg_prev->off);
1326                LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327                        (pg->flag & OBD_BRW_SRVLOCK));
1328
1329                ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330                requested_nob += pg->count;
1331
1332                if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                        niobuf--;
1334                        niobuf->len += pg->count;
1335                } else {
1336                        niobuf->offset = pg->off;
1337                        niobuf->len    = pg->count;
1338                        niobuf->flags  = pg->flag;
1339                }
1340                pg_prev = pg;
1341        }
1342
1343        LASSERTF((void *)(niobuf - niocount) ==
1344                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348        osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349        if (resend) {
1350                if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351                        body->oa.o_valid |= OBD_MD_FLFLAGS;
1352                        body->oa.o_flags = 0;
1353                }
1354                body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355        }
1356
1357        if (osc_should_shrink_grant(cli))
1358                osc_shrink_grant_local(cli, &body->oa);
1359
1360        /* size[REQ_REC_OFF] still sizeof (*body) */
1361        if (opc == OST_WRITE) {
1362                if (cli->cl_checksum &&
1363                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364                        /* store cl_cksum_type in a local variable since
1365                         * it can be changed via lprocfs */
1366                        cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369                                oa->o_flags &= OBD_FL_LOCAL_MASK;
1370                                body->oa.o_flags = 0;
1371                        }
1372                        body->oa.o_flags |= cksum_type_pack(cksum_type);
1373                        body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                        body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375                                                             page_count, pga,
1376                                                             OST_WRITE,
1377                                                             cksum_type);
1378                        CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379                               body->oa.o_cksum);
1380                        /* save this in 'oa', too, for later checking */
1381                        oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                        oa->o_flags |= cksum_type_pack(cksum_type);
1383                } else {
1384                        /* clear out the checksum flag, in case this is a
1385                         * resend but cl_checksum is no longer set. b=11238 */
1386                        oa->o_valid &= ~OBD_MD_FLCKSUM;
1387                }
1388                oa->o_cksum = body->oa.o_cksum;
1389                /* 1 RC per niobuf */
1390                req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391                                     sizeof(__u32) * niocount);
1392        } else {
1393                if (cli->cl_checksum &&
1394                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396                                body->oa.o_flags = 0;
1397                        body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398                        body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                }
1400        }
1401        ptlrpc_request_set_replen(req);
1402
1403        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404        aa = ptlrpc_req_async_args(req);
1405        aa->aa_oa = oa;
1406        aa->aa_requested_nob = requested_nob;
1407        aa->aa_nio_count = niocount;
1408        aa->aa_page_count = page_count;
1409        aa->aa_resends = 0;
1410        aa->aa_ppga = pga;
1411        aa->aa_cli = cli;
1412        INIT_LIST_HEAD(&aa->aa_oaps);
1413        if (ocapa && reserve)
1414                aa->aa_ocapa = capa_get(ocapa);
1415
1416        *reqp = req;
1417        return 0;
1418
1419 out:
1420        ptlrpc_req_finished(req);
1421        return rc;
1422}
1423
1424static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1425                                __u32 client_cksum, __u32 server_cksum, int nob,
1426                                obd_count page_count, struct brw_page **pga,
1427                                cksum_type_t client_cksum_type)
1428{
1429        __u32 new_cksum;
1430        char *msg;
1431        cksum_type_t cksum_type;
1432
1433        if (server_cksum == client_cksum) {
1434                CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1435                return 0;
1436        }
1437
1438        cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1439                                       oa->o_flags : 0);
1440        new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1441                                      cksum_type);
1442
1443        if (cksum_type != client_cksum_type)
1444                msg = "the server did not use the checksum type specified in "
1445                      "the original request - likely a protocol problem";
1446        else if (new_cksum == server_cksum)
1447                msg = "changed on the client after we checksummed it - "
1448                      "likely false positive due to mmap IO (bug 11742)";
1449        else if (new_cksum == client_cksum)
1450                msg = "changed in transit before arrival at OST";
1451        else
1452                msg = "changed in transit AND doesn't match the original - "
1453                      "likely false positive due to mmap IO (bug 11742)";
1454
1455        LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1456                           " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1457                           msg, libcfs_nid2str(peer->nid),
1458                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1459                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1460                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1461                           POSTID(&oa->o_oi), pga[0]->off,
1462                           pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463        CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464               "client csum now %x\n", client_cksum, client_cksum_type,
1465               server_cksum, cksum_type, new_cksum);
1466        return 1;
1467}
1468
1469/* Note rc enters this function as number of bytes transferred */
1470static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471{
1472        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473        const lnet_process_id_t *peer =
1474                        &req->rq_import->imp_connection->c_peer;
1475        struct client_obd *cli = aa->aa_cli;
1476        struct ost_body *body;
1477        __u32 client_cksum = 0;
1478
1479        if (rc < 0 && rc != -EDQUOT) {
1480                DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1481                return rc;
1482        }
1483
1484        LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1485        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1486        if (body == NULL) {
1487                DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1488                return -EPROTO;
1489        }
1490
1491        /* set/clear over quota flag for a uid/gid */
1492        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1493            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1494                unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1495
1496                CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1497                       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1498                       body->oa.o_flags);
1499                osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1500        }
1501
1502        osc_update_grant(cli, body);
1503
1504        if (rc < 0)
1505                return rc;
1506
1507        if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1508                client_cksum = aa->aa_oa->o_cksum; /* save for later */
1509
1510        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1511                if (rc > 0) {
1512                        CERROR("Unexpected +ve rc %d\n", rc);
1513                        return -EPROTO;
1514                }
1515                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1516
1517                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1518                        return -EAGAIN;
1519
1520                if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1521                    check_write_checksum(&body->oa, peer, client_cksum,
1522                                         body->oa.o_cksum, aa->aa_requested_nob,
1523                                         aa->aa_page_count, aa->aa_ppga,
1524                                         cksum_type_unpack(aa->aa_oa->o_flags)))
1525                        return -EAGAIN;
1526
1527                rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1528                                     aa->aa_page_count, aa->aa_ppga);
1529                GOTO(out, rc);
1530        }
1531
1532        /* The rest of this function executes only for OST_READs */
1533
1534        /* if unwrap_bulk failed, return -EAGAIN to retry */
1535        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536        if (rc < 0)
1537                GOTO(out, rc = -EAGAIN);
1538
1539        if (rc > aa->aa_requested_nob) {
1540                CERROR("Unexpected rc %d (%d requested)\n", rc,
1541                       aa->aa_requested_nob);
1542                return -EPROTO;
1543        }
1544
1545        if (rc != req->rq_bulk->bd_nob_transferred) {
1546                CERROR ("Unexpected rc %d (%d transferred)\n",
1547                        rc, req->rq_bulk->bd_nob_transferred);
1548                return (-EPROTO);
1549        }
1550
1551        if (rc < aa->aa_requested_nob)
1552                handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1553
1554        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1555                static int cksum_counter;
1556                __u32      server_cksum = body->oa.o_cksum;
1557                char      *via;
1558                char      *router;
1559                cksum_type_t cksum_type;
1560
1561                cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1562                                               body->oa.o_flags : 0);
1563                client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1564                                                 aa->aa_ppga, OST_READ,
1565                                                 cksum_type);
1566
1567                if (peer->nid == req->rq_bulk->bd_sender) {
1568                        via = router = "";
1569                } else {
1570                        via = " via ";
1571                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
1572                }
1573
1574                if (server_cksum == ~0 && rc > 0) {
1575                        CERROR("Protocol error: server %s set the 'checksum' "
1576                               "bit, but didn't send a checksum.  Not fatal, "
1577                               "but please notify on http://bugs.whamcloud.com/\n",
1578                               libcfs_nid2str(peer->nid));
1579                } else if (server_cksum != client_cksum) {
1580                        LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1581                                           "%s%s%s inode "DFID" object "DOSTID
1582                                           " extent ["LPU64"-"LPU64"]\n",
1583                                           req->rq_import->imp_obd->obd_name,
1584                                           libcfs_nid2str(peer->nid),
1585                                           via, router,
1586                                           body->oa.o_valid & OBD_MD_FLFID ?
1587                                                body->oa.o_parent_seq : (__u64)0,
1588                                           body->oa.o_valid & OBD_MD_FLFID ?
1589                                                body->oa.o_parent_oid : 0,
1590                                           body->oa.o_valid & OBD_MD_FLFID ?
1591                                                body->oa.o_parent_ver : 0,
1592                                           POSTID(&body->oa.o_oi),
1593                                           aa->aa_ppga[0]->off,
1594                                           aa->aa_ppga[aa->aa_page_count-1]->off +
1595                                           aa->aa_ppga[aa->aa_page_count-1]->count -
1596                                                                        1);
1597                        CERROR("client %x, server %x, cksum_type %x\n",
1598                               client_cksum, server_cksum, cksum_type);
1599                        cksum_counter = 0;
1600                        aa->aa_oa->o_cksum = client_cksum;
1601                        rc = -EAGAIN;
1602                } else {
1603                        cksum_counter++;
1604                        CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605                        rc = 0;
1606                }
1607        } else if (unlikely(client_cksum)) {
1608                static int cksum_missed;
1609
1610                cksum_missed++;
1611                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612                        CERROR("Checksum %u requested from %s but not sent\n",
1613                               cksum_missed, libcfs_nid2str(peer->nid));
1614        } else {
1615                rc = 0;
1616        }
1617out:
1618        if (rc >= 0)
1619                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620                                     aa->aa_oa, &body->oa);
1621
1622        return rc;
1623}
1624
1625static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1626                            struct lov_stripe_md *lsm,
1627                            obd_count page_count, struct brw_page **pga,
1628                            struct obd_capa *ocapa)
1629{
1630        struct ptlrpc_request *req;
1631        int                 rc;
1632        wait_queue_head_t           waitq;
1633        int                 generation, resends = 0;
1634        struct l_wait_info     lwi;
1635
1636        init_waitqueue_head(&waitq);
1637        generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1638
1639restart_bulk:
1640        rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1641                                  page_count, pga, &req, ocapa, 0, resends);
1642        if (rc != 0)
1643                return (rc);
1644
1645        if (resends) {
1646                req->rq_generation_set = 1;
1647                req->rq_import_generation = generation;
1648                req->rq_sent = cfs_time_current_sec() + resends;
1649        }
1650
1651        rc = ptlrpc_queue_wait(req);
1652
1653        if (rc == -ETIMEDOUT && req->rq_resend) {
1654                DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1655                ptlrpc_req_finished(req);
1656                goto restart_bulk;
1657        }
1658
1659        rc = osc_brw_fini_request(req, rc);
1660
1661        ptlrpc_req_finished(req);
1662        /* When server return -EINPROGRESS, client should always retry
1663         * regardless of the number of times the bulk was resent already.*/
1664        if (osc_recoverable_error(rc)) {
1665                resends++;
1666                if (rc != -EINPROGRESS &&
1667                    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1668                        CERROR("%s: too many resend retries for object: "
1669                               ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1670                               POSTID(&oa->o_oi), rc);
1671                        goto out;
1672                }
1673                if (generation !=
1674                    exp->exp_obd->u.cli.cl_import->imp_generation) {
1675                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
1676                               ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1677                               POSTID(&oa->o_oi), rc);
1678                        goto out;
1679                }
1680
1681                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1682                                       NULL);
1683                l_wait_event(waitq, 0, &lwi);
1684
1685                goto restart_bulk;
1686        }
1687out:
1688        if (rc == -EAGAIN || rc == -EINPROGRESS)
1689                rc = -EIO;
1690        return rc;
1691}
1692
1693static int osc_brw_redo_request(struct ptlrpc_request *request,
1694                                struct osc_brw_async_args *aa, int rc)
1695{
1696        struct ptlrpc_request *new_req;
1697        struct osc_brw_async_args *new_aa;
1698        struct osc_async_page *oap;
1699
1700        DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1701                  "redo for recoverable error %d", rc);
1702
1703        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1704                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1705                                  aa->aa_cli, aa->aa_oa,
1706                                  NULL /* lsm unused by osc currently */,
1707                                  aa->aa_page_count, aa->aa_ppga,
1708                                  &new_req, aa->aa_ocapa, 0, 1);
1709        if (rc)
1710                return rc;
1711
1712        list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1713                if (oap->oap_request != NULL) {
1714                        LASSERTF(request == oap->oap_request,
1715                                 "request %p != oap_request %p\n",
1716                                 request, oap->oap_request);
1717                        if (oap->oap_interrupted) {
1718                                ptlrpc_req_finished(new_req);
1719                                return -EINTR;
1720                        }
1721                }
1722        }
1723        /* New request takes over pga and oaps from old request.
1724         * Note that copying a list_head doesn't work, need to move it... */
1725        aa->aa_resends++;
1726        new_req->rq_interpret_reply = request->rq_interpret_reply;
1727        new_req->rq_async_args = request->rq_async_args;
1728        /* cap resend delay to the current request timeout, this is similar to
1729         * what ptlrpc does (see after_reply()) */
1730        if (aa->aa_resends > new_req->rq_timeout)
1731                new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1732        else
1733                new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1734        new_req->rq_generation_set = 1;
1735        new_req->rq_import_generation = request->rq_import_generation;
1736
1737        new_aa = ptlrpc_req_async_args(new_req);
1738
1739        INIT_LIST_HEAD(&new_aa->aa_oaps);
1740        list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1741        INIT_LIST_HEAD(&new_aa->aa_exts);
1742        list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1743        new_aa->aa_resends = aa->aa_resends;
1744
1745        list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1746                if (oap->oap_request) {
1747                        ptlrpc_req_finished(oap->oap_request);
1748                        oap->oap_request = ptlrpc_request_addref(new_req);
1749                }
1750        }
1751
1752        new_aa->aa_ocapa = aa->aa_ocapa;
1753        aa->aa_ocapa = NULL;
1754
1755        /* XXX: This code will run into problem if we're going to support
1756         * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1757         * and wait for all of them to be finished. We should inherit request
1758         * set from old request. */
1759        ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1760
1761        DEBUG_REQ(D_INFO, new_req, "new request");
1762        return 0;
1763}
1764
1765/*
1766 * ugh, we want disk allocation on the target to happen in offset order.  we'll
1767 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1768 * fine for our small page arrays and doesn't require allocation.  its an
1769 * insertion sort that swaps elements that are strides apart, shrinking the
1770 * stride down until its '1' and the array is sorted.
1771 */
1772static void sort_brw_pages(struct brw_page **array, int num)
1773{
1774        int stride, i, j;
1775        struct brw_page *tmp;
1776
1777        if (num == 1)
1778                return;
1779        for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1780                ;
1781
1782        do {
1783                stride /= 3;
1784                for (i = stride ; i < num ; i++) {
1785                        tmp = array[i];
1786                        j = i;
1787                        while (j >= stride && array[j - stride]->off > tmp->off) {
1788                                array[j] = array[j - stride];
1789                                j -= stride;
1790                        }
1791                        array[j] = tmp;
1792                }
1793        } while (stride > 1);
1794}
1795
1796static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1797{
1798        int count = 1;
1799        int offset;
1800        int i = 0;
1801
1802        LASSERT (pages > 0);
1803        offset = pg[i]->off & ~CFS_PAGE_MASK;
1804
1805        for (;;) {
1806                pages--;
1807                if (pages == 0)  /* that's all */
1808                        return count;
1809
1810                if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1811                        return count;   /* doesn't end on page boundary */
1812
1813                i++;
1814                offset = pg[i]->off & ~CFS_PAGE_MASK;
1815                if (offset != 0)        /* doesn't start on page boundary */
1816                        return count;
1817
1818                count++;
1819        }
1820}
1821
1822static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1823{
1824        struct brw_page **ppga;
1825        int i;
1826
1827        OBD_ALLOC(ppga, sizeof(*ppga) * count);
1828        if (ppga == NULL)
1829                return NULL;
1830
1831        for (i = 0; i < count; i++)
1832                ppga[i] = pga + i;
1833        return ppga;
1834}
1835
1836static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1837{
1838        LASSERT(ppga != NULL);
1839        OBD_FREE(ppga, sizeof(*ppga) * count);
1840}
1841
1842static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1843                   obd_count page_count, struct brw_page *pga,
1844                   struct obd_trans_info *oti)
1845{
1846        struct obdo *saved_oa = NULL;
1847        struct brw_page **ppga, **orig;
1848        struct obd_import *imp = class_exp2cliimp(exp);
1849        struct client_obd *cli;
1850        int rc, page_count_orig;
1851
1852        LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1853        cli = &imp->imp_obd->u.cli;
1854
1855        if (cmd & OBD_BRW_CHECK) {
1856                /* The caller just wants to know if there's a chance that this
1857                 * I/O can succeed */
1858
1859                if (imp->imp_invalid)
1860                        return -EIO;
1861                return 0;
1862        }
1863
1864        /* test_brw with a failed create can trip this, maybe others. */
1865        LASSERT(cli->cl_max_pages_per_rpc);
1866
1867        rc = 0;
1868
1869        orig = ppga = osc_build_ppga(pga, page_count);
1870        if (ppga == NULL)
1871                return -ENOMEM;
1872        page_count_orig = page_count;
1873
1874        sort_brw_pages(ppga, page_count);
1875        while (page_count) {
1876                obd_count pages_per_brw;
1877
1878                if (page_count > cli->cl_max_pages_per_rpc)
1879                        pages_per_brw = cli->cl_max_pages_per_rpc;
1880                else
1881                        pages_per_brw = page_count;
1882
1883                pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1884
1885                if (saved_oa != NULL) {
1886                        /* restore previously saved oa */
1887                        *oinfo->oi_oa = *saved_oa;
1888                } else if (page_count > pages_per_brw) {
1889                        /* save a copy of oa (brw will clobber it) */
1890                        OBDO_ALLOC(saved_oa);
1891                        if (saved_oa == NULL)
1892                                GOTO(out, rc = -ENOMEM);
1893                        *saved_oa = *oinfo->oi_oa;
1894                }
1895
1896                rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1897                                      pages_per_brw, ppga, oinfo->oi_capa);
1898
1899                if (rc != 0)
1900                        break;
1901
1902                page_count -= pages_per_brw;
1903                ppga += pages_per_brw;
1904        }
1905
1906out:
1907        osc_release_ppga(orig, page_count_orig);
1908
1909        if (saved_oa != NULL)
1910                OBDO_FREE(saved_oa);
1911
1912        return rc;
1913}
1914
1915static int brw_interpret(const struct lu_env *env,
1916                         struct ptlrpc_request *req, void *data, int rc)
1917{
1918        struct osc_brw_async_args *aa = data;
1919        struct osc_extent *ext;
1920        struct osc_extent *tmp;
1921        struct cl_object  *obj = NULL;
1922        struct client_obd *cli = aa->aa_cli;
1923
1924        rc = osc_brw_fini_request(req, rc);
1925        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1926        /* When server return -EINPROGRESS, client should always retry
1927         * regardless of the number of times the bulk was resent already. */
1928        if (osc_recoverable_error(rc)) {
1929                if (req->rq_import_generation !=
1930                    req->rq_import->imp_generation) {
1931                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
1932                               ""DOSTID", rc = %d.\n",
1933                               req->rq_import->imp_obd->obd_name,
1934                               POSTID(&aa->aa_oa->o_oi), rc);
1935                } else if (rc == -EINPROGRESS ||
1936                    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1937                        rc = osc_brw_redo_request(req, aa, rc);
1938                } else {
1939                        CERROR("%s: too many resent retries for object: "
1940                               ""LPU64":"LPU64", rc = %d.\n",
1941                               req->rq_import->imp_obd->obd_name,
1942                               POSTID(&aa->aa_oa->o_oi), rc);
1943                }
1944
1945                if (rc == 0)
1946                        return 0;
1947                else if (rc == -EAGAIN || rc == -EINPROGRESS)
1948                        rc = -EIO;
1949        }
1950
1951        if (aa->aa_ocapa) {
1952                capa_put(aa->aa_ocapa);
1953                aa->aa_ocapa = NULL;
1954        }
1955
1956        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1957                if (obj == NULL && rc == 0) {
1958                        obj = osc2cl(ext->oe_obj);
1959                        cl_object_get(obj);
1960                }
1961
1962                list_del_init(&ext->oe_link);
1963                osc_extent_finish(env, ext, 1, rc);
1964        }
1965        LASSERT(list_empty(&aa->aa_exts));
1966        LASSERT(list_empty(&aa->aa_oaps));
1967
1968        if (obj != NULL) {
1969                struct obdo *oa = aa->aa_oa;
1970                struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1971                unsigned long valid = 0;
1972
1973                LASSERT(rc == 0);
1974                if (oa->o_valid & OBD_MD_FLBLOCKS) {
1975                        attr->cat_blocks = oa->o_blocks;
1976                        valid |= CAT_BLOCKS;
1977                }
1978                if (oa->o_valid & OBD_MD_FLMTIME) {
1979                        attr->cat_mtime = oa->o_mtime;
1980                        valid |= CAT_MTIME;
1981                }
1982                if (oa->o_valid & OBD_MD_FLATIME) {
1983                        attr->cat_atime = oa->o_atime;
1984                        valid |= CAT_ATIME;
1985                }
1986                if (oa->o_valid & OBD_MD_FLCTIME) {
1987                        attr->cat_ctime = oa->o_ctime;
1988                        valid |= CAT_CTIME;
1989                }
1990                if (valid != 0) {
1991                        cl_object_attr_lock(obj);
1992                        cl_object_attr_set(env, obj, attr, valid);
1993                        cl_object_attr_unlock(obj);
1994                }
1995                cl_object_put(env, obj);
1996        }
1997        OBDO_FREE(aa->aa_oa);
1998
1999        cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2000                          req->rq_bulk->bd_nob_transferred);
2001        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2002        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2003
2004        client_obd_list_lock(&cli->cl_loi_list_lock);
2005        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2006         * is called so we know whether to go to sync BRWs or wait for more
2007         * RPCs to complete */
2008        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2009                cli->cl_w_in_flight--;
2010        else
2011                cli->cl_r_in_flight--;
2012        osc_wake_cache_waiters(cli);
2013        client_obd_list_unlock(&cli->cl_loi_list_lock);
2014
2015        osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2016        return rc;
2017}
2018
2019/**
2020 * Build an RPC by the list of extent @ext_list. The caller must ensure
2021 * that the total pages in this list are NOT over max pages per RPC.
2022 * Extents in the list must be in OES_RPC state.
2023 */
2024int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2025                  struct list_head *ext_list, int cmd, pdl_policy_t pol)
2026{
2027        struct ptlrpc_request           *req = NULL;
2028        struct osc_extent               *ext;
2029        struct brw_page                 **pga = NULL;
2030        struct osc_brw_async_args       *aa = NULL;
2031        struct obdo                     *oa = NULL;
2032        struct osc_async_page           *oap;
2033        struct osc_async_page           *tmp;
2034        struct cl_req                   *clerq = NULL;
2035        enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2036                                                                      CRT_READ;
2037        struct ldlm_lock                *lock = NULL;
2038        struct cl_req_attr              *crattr = NULL;
2039        obd_off                         starting_offset = OBD_OBJECT_EOF;
2040        obd_off                         ending_offset = 0;
2041        int                             mpflag = 0;
2042        int                             mem_tight = 0;
2043        int                             page_count = 0;
2044        int                             i;
2045        int                             rc;
2046        LIST_HEAD(rpc_list);
2047
2048        LASSERT(!list_empty(ext_list));
2049
2050        /* add pages into rpc_list to build BRW rpc */
2051        list_for_each_entry(ext, ext_list, oe_link) {
2052                LASSERT(ext->oe_state == OES_RPC);
2053                mem_tight |= ext->oe_memalloc;
2054                list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2055                        ++page_count;
2056                        list_add_tail(&oap->oap_rpc_item, &rpc_list);
2057                        if (starting_offset > oap->oap_obj_off)
2058                                starting_offset = oap->oap_obj_off;
2059                        else
2060                                LASSERT(oap->oap_page_off == 0);
2061                        if (ending_offset < oap->oap_obj_off + oap->oap_count)
2062                                ending_offset = oap->oap_obj_off +
2063                                                oap->oap_count;
2064                        else
2065                                LASSERT(oap->oap_page_off + oap->oap_count ==
2066                                        PAGE_CACHE_SIZE);
2067                }
2068        }
2069
2070        if (mem_tight)
2071                mpflag = cfs_memory_pressure_get_and_set();
2072
2073        OBD_ALLOC(crattr, sizeof(*crattr));
2074        if (crattr == NULL)
2075                GOTO(out, rc = -ENOMEM);
2076
2077        OBD_ALLOC(pga, sizeof(*pga) * page_count);
2078        if (pga == NULL)
2079                GOTO(out, rc = -ENOMEM);
2080
2081        OBDO_ALLOC(oa);
2082        if (oa == NULL)
2083                GOTO(out, rc = -ENOMEM);
2084
2085        i = 0;
2086        list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2087                struct cl_page *page = oap2cl_page(oap);
2088                if (clerq == NULL) {
2089                        clerq = cl_req_alloc(env, page, crt,
2090                                             1 /* only 1-object rpcs for now */);
2091                        if (IS_ERR(clerq))
2092                                GOTO(out, rc = PTR_ERR(clerq));
2093                        lock = oap->oap_ldlm_lock;
2094                }
2095                if (mem_tight)
2096                        oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2097                pga[i] = &oap->oap_brw_page;
2098                pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2099                CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2100                       pga[i]->pg, page_index(oap->oap_page), oap,
2101                       pga[i]->flag);
2102                i++;
2103                cl_req_page_add(env, clerq, page);
2104        }
2105
2106        /* always get the data for the obdo for the rpc */
2107        LASSERT(clerq != NULL);
2108        crattr->cra_oa = oa;
2109        cl_req_attr_set(env, clerq, crattr, ~0ULL);
2110        if (lock) {
2111                oa->o_handle = lock->l_remote_handle;
2112                oa->o_valid |= OBD_MD_FLHANDLE;
2113        }
2114
2115        rc = cl_req_prep(env, clerq);
2116        if (rc != 0) {
2117                CERROR("cl_req_prep failed: %d\n", rc);
2118                GOTO(out, rc);
2119        }
2120
2121        sort_brw_pages(pga, page_count);
2122        rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2123                        pga, &req, crattr->cra_capa, 1, 0);
2124        if (rc != 0) {
2125                CERROR("prep_req failed: %d\n", rc);
2126                GOTO(out, rc);
2127        }
2128
2129        req->rq_interpret_reply = brw_interpret;
2130
2131        if (mem_tight != 0)
2132                req->rq_memalloc = 1;
2133
2134        /* Need to update the timestamps after the request is built in case
2135         * we race with setattr (locally or in queue at OST).  If OST gets
2136         * later setattr before earlier BRW (as determined by the request xid),
2137         * the OST will not use BRW timestamps.  Sadly, there is no obvious
2138         * way to do this in a single call.  bug 10150 */
2139        cl_req_attr_set(env, clerq, crattr,
2140                        OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2141
2142        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2143
2144        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2145        aa = ptlrpc_req_async_args(req);
2146        INIT_LIST_HEAD(&aa->aa_oaps);
2147        list_splice_init(&rpc_list, &aa->aa_oaps);
2148        INIT_LIST_HEAD(&aa->aa_exts);
2149        list_splice_init(ext_list, &aa->aa_exts);
2150        aa->aa_clerq = clerq;
2151
2152        /* queued sync pages can be torn down while the pages
2153         * were between the pending list and the rpc */
2154        tmp = NULL;
2155        list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2156                /* only one oap gets a request reference */
2157                if (tmp == NULL)
2158                        tmp = oap;
2159                if (oap->oap_interrupted && !req->rq_intr) {
2160                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2161                                        oap, req);
2162                        ptlrpc_mark_interrupted(req);
2163                }
2164        }
2165        if (tmp != NULL)
2166                tmp->oap_request = ptlrpc_request_addref(req);
2167
2168        client_obd_list_lock(&cli->cl_loi_list_lock);
2169        starting_offset >>= PAGE_CACHE_SHIFT;
2170        if (cmd == OBD_BRW_READ) {
2171                cli->cl_r_in_flight++;
2172                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2173                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2174                lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2175                                      starting_offset + 1);
2176        } else {
2177                cli->cl_w_in_flight++;
2178                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2179                lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2180                lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2181                                      starting_offset + 1);
2182        }
2183        client_obd_list_unlock(&cli->cl_loi_list_lock);
2184
2185        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2186                  page_count, aa, cli->cl_r_in_flight,
2187                  cli->cl_w_in_flight);
2188
2189        /* XXX: Maybe the caller can check the RPC bulk descriptor to
2190         * see which CPU/NUMA node the majority of pages were allocated
2191         * on, and try to assign the async RPC to the CPU core
2192         * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2193         *
2194         * But on the other hand, we expect that multiple ptlrpcd
2195         * threads and the initial write sponsor can run in parallel,
2196         * especially when data checksum is enabled, which is CPU-bound
2197         * operation and single ptlrpcd thread cannot process in time.
2198         * So more ptlrpcd threads sharing BRW load
2199         * (with PDL_POLICY_ROUND) seems better.
2200         */
2201        ptlrpcd_add_req(req, pol, -1);
2202        rc = 0;
2203
2204out:
2205        if (mem_tight != 0)
2206                cfs_memory_pressure_restore(mpflag);
2207
2208        if (crattr != NULL) {
2209                capa_put(crattr->cra_capa);
2210                OBD_FREE(crattr, sizeof(*crattr));
2211        }
2212
2213        if (rc != 0) {
2214                LASSERT(req == NULL);
2215
2216                if (oa)
2217                        OBDO_FREE(oa);
2218                if (pga)
2219                        OBD_FREE(pga, sizeof(*pga) * page_count);
2220                /* this should happen rarely and is pretty bad, it makes the
2221                 * pending list not follow the dirty order */
2222                while (!list_empty(ext_list)) {
2223                        ext = list_entry(ext_list->next, struct osc_extent,
2224                                             oe_link);
2225                        list_del_init(&ext->oe_link);
2226                        osc_extent_finish(env, ext, 0, rc);
2227                }
2228                if (clerq && !IS_ERR(clerq))
2229                        cl_req_completion(env, clerq, rc);
2230        }
2231        return rc;
2232}
2233
2234static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2235                                        struct ldlm_enqueue_info *einfo)
2236{
2237        void *data = einfo->ei_cbdata;
2238        int set = 0;
2239
2240        LASSERT(lock != NULL);
2241        LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2242        LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2243        LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2244        LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2245
2246        lock_res_and_lock(lock);
2247        spin_lock(&osc_ast_guard);
2248
2249        if (lock->l_ast_data == NULL)
2250                lock->l_ast_data = data;
2251        if (lock->l_ast_data == data)
2252                set = 1;
2253
2254        spin_unlock(&osc_ast_guard);
2255        unlock_res_and_lock(lock);
2256
2257        return set;
2258}
2259
2260static int osc_set_data_with_check(struct lustre_handle *lockh,
2261                                   struct ldlm_enqueue_info *einfo)
2262{
2263        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2264        int set = 0;
2265
2266        if (lock != NULL) {
2267                set = osc_set_lock_data_with_check(lock, einfo);
2268                LDLM_LOCK_PUT(lock);
2269        } else
2270                CERROR("lockh %p, data %p - client evicted?\n",
2271                       lockh, einfo->ei_cbdata);
2272        return set;
2273}
2274
2275static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2276                             ldlm_iterator_t replace, void *data)
2277{
2278        struct ldlm_res_id res_id;
2279        struct obd_device *obd = class_exp2obd(exp);
2280
2281        ostid_build_res_name(&lsm->lsm_oi, &res_id);
2282        ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2283        return 0;
2284}
2285
2286/* find any ldlm lock of the inode in osc
2287 * return 0    not find
2288 *      1    find one
2289 *      < 0    error */
2290static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2291                           ldlm_iterator_t replace, void *data)
2292{
2293        struct ldlm_res_id res_id;
2294        struct obd_device *obd = class_exp2obd(exp);
2295        int rc = 0;
2296
2297        ostid_build_res_name(&lsm->lsm_oi, &res_id);
2298        rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2299        if (rc == LDLM_ITER_STOP)
2300                return(1);
2301        if (rc == LDLM_ITER_CONTINUE)
2302                return(0);
2303        return(rc);
2304}
2305
2306static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2307                            obd_enqueue_update_f upcall, void *cookie,
2308                            __u64 *flags, int agl, int rc)
2309{
2310        int intent = *flags & LDLM_FL_HAS_INTENT;
2311
2312        if (intent) {
2313                /* The request was created before ldlm_cli_enqueue call. */
2314                if (rc == ELDLM_LOCK_ABORTED) {
2315                        struct ldlm_reply *rep;
2316                        rep = req_capsule_server_get(&req->rq_pill,
2317                                                     &RMF_DLM_REP);
2318
2319                        LASSERT(rep != NULL);
2320                        rep->lock_policy_res1 =
2321                                ptlrpc_status_ntoh(rep->lock_policy_res1);
2322                        if (rep->lock_policy_res1)
2323                                rc = rep->lock_policy_res1;
2324                }
2325        }
2326
2327        if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2328            (rc == 0)) {
2329                *flags |= LDLM_FL_LVB_READY;
2330                CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2331                       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2332        }
2333
2334        /* Call the update callback. */
2335        rc = (*upcall)(cookie, rc);
2336        return rc;
2337}
2338
2339static int osc_enqueue_interpret(const struct lu_env *env,
2340                                 struct ptlrpc_request *req,
2341                                 struct osc_enqueue_args *aa, int rc)
2342{
2343        struct ldlm_lock *lock;
2344        struct lustre_handle handle;
2345        __u32 mode;
2346        struct ost_lvb *lvb;
2347        __u32 lvb_len;
2348        __u64 *flags = aa->oa_flags;
2349
2350        /* Make a local copy of a lock handle and a mode, because aa->oa_*
2351         * might be freed anytime after lock upcall has been called. */
2352        lustre_handle_copy(&handle, aa->oa_lockh);
2353        mode = aa->oa_ei->ei_mode;
2354
2355        /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2356         * be valid. */
2357        lock = ldlm_handle2lock(&handle);
2358
2359        /* Take an additional reference so that a blocking AST that
2360         * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2361         * to arrive after an upcall has been executed by
2362         * osc_enqueue_fini(). */
2363        ldlm_lock_addref(&handle, mode);
2364
2365        /* Let CP AST to grant the lock first. */
2366        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2367
2368        if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2369                lvb = NULL;
2370                lvb_len = 0;
2371        } else {
2372                lvb = aa->oa_lvb;
2373                lvb_len = sizeof(*aa->oa_lvb);
2374        }
2375
2376        /* Complete obtaining the lock procedure. */
2377        rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2378                                   mode, flags, lvb, lvb_len, &handle, rc);
2379        /* Complete osc stuff. */
2380        rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2381                              flags, aa->oa_agl, rc);
2382
2383        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2384
2385        /* Release the lock for async request. */
2386        if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2387                /*
2388                 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2389                 * not already released by
2390                 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2391                 */
2392                ldlm_lock_decref(&handle, mode);
2393
2394        LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2395                 aa->oa_lockh, req, aa);
2396        ldlm_lock_decref(&handle, mode);
2397        LDLM_LOCK_PUT(lock);
2398        return rc;
2399}
2400
2401void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2402                        struct lov_oinfo *loi, int flags,
2403                        struct ost_lvb *lvb, __u32 mode, int rc)
2404{
2405        struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2406
2407        if (rc == ELDLM_OK) {
2408                __u64 tmp;
2409
2410                LASSERT(lock != NULL);
2411                loi->loi_lvb = *lvb;
2412                tmp = loi->loi_lvb.lvb_size;
2413                /* Extend KMS up to the end of this lock and no further
2414                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2415                if (tmp > lock->l_policy_data.l_extent.end)
2416                        tmp = lock->l_policy_data.l_extent.end + 1;
2417                if (tmp >= loi->loi_kms) {
2418                        LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2419                                   ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2420                        loi_kms_set(loi, tmp);
2421                } else {
2422                        LDLM_DEBUG(lock, "lock acquired, setting rss="
2423                                   LPU64"; leaving kms="LPU64", end="LPU64,
2424                                   loi->loi_lvb.lvb_size, loi->loi_kms,
2425                                   lock->l_policy_data.l_extent.end);
2426                }
2427                ldlm_lock_allow_match(lock);
2428        } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2429                LASSERT(lock != NULL);
2430                loi->loi_lvb = *lvb;
2431                ldlm_lock_allow_match(lock);
2432                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2433                       " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2434                rc = ELDLM_OK;
2435        }
2436
2437        if (lock != NULL) {
2438                if (rc != ELDLM_OK)
2439                        ldlm_lock_fail_match(lock);
2440
2441                LDLM_LOCK_PUT(lock);
2442        }
2443}
2444EXPORT_SYMBOL(osc_update_enqueue);
2445
2446struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2447
2448/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2449 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2450 * other synchronous requests, however keeping some locks and trying to obtain
2451 * others may take a considerable amount of time in a case of ost failure; and
2452 * when other sync requests do not get released lock from a client, the client
2453 * is excluded from the cluster -- such scenarious make the life difficult, so
2454 * release locks just after they are obtained. */
2455int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2456                     __u64 *flags, ldlm_policy_data_t *policy,
2457                     struct ost_lvb *lvb, int kms_valid,
2458                     obd_enqueue_update_f upcall, void *cookie,
2459                     struct ldlm_enqueue_info *einfo,
2460                     struct lustre_handle *lockh,
2461                     struct ptlrpc_request_set *rqset, int async, int agl)
2462{
2463        struct obd_device *obd = exp->exp_obd;
2464        struct ptlrpc_request *req = NULL;
2465        int intent = *flags & LDLM_FL_HAS_INTENT;
2466        int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2467        ldlm_mode_t mode;
2468        int rc;
2469
2470        /* Filesystem lock extents are extended to page boundaries so that
2471         * dealing with the page cache is a little smoother.  */
2472        policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2473        policy->l_extent.end |= ~CFS_PAGE_MASK;
2474
2475        /*
2476         * kms is not valid when either object is completely fresh (so that no
2477         * locks are cached), or object was evicted. In the latter case cached
2478         * lock cannot be used, because it would prime inode state with
2479         * potentially stale LVB.
2480         */
2481        if (!kms_valid)
2482                goto no_match;
2483
2484        /* Next, search for already existing extent locks that will cover us */
2485        /* If we're trying to read, we also search for an existing PW lock.  The
2486         * VFS and page cache already protect us locally, so lots of readers/
2487         * writers can share a single PW lock.
2488         *
2489         * There are problems with conversion deadlocks, so instead of
2490         * converting a read lock to a write lock, we'll just enqueue a new
2491         * one.
2492         *
2493         * At some point we should cancel the read lock instead of making them
2494         * send us a blocking callback, but there are problems with canceling
2495         * locks out from other users right now, too. */
2496        mode = einfo->ei_mode;
2497        if (einfo->ei_mode == LCK_PR)
2498                mode |= LCK_PW;
2499        mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2500                               einfo->ei_type, policy, mode, lockh, 0);
2501        if (mode) {
2502                struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2503
2504                if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2505                        /* For AGL, if enqueue RPC is sent but the lock is not
2506                         * granted, then skip to process this strpe.
2507                         * Return -ECANCELED to tell the caller. */
2508                        ldlm_lock_decref(lockh, mode);
2509                        LDLM_LOCK_PUT(matched);
2510                        return -ECANCELED;
2511                } else if (osc_set_lock_data_with_check(matched, einfo)) {
2512                        *flags |= LDLM_FL_LVB_READY;
2513                        /* addref the lock only if not async requests and PW
2514                         * lock is matched whereas we asked for PR. */
2515                        if (!rqset && einfo->ei_mode != mode)
2516                                ldlm_lock_addref(lockh, LCK_PR);
2517                        if (intent) {
2518                                /* I would like to be able to ASSERT here that
2519                                 * rss <= kms, but I can't, for reasons which
2520                                 * are explained in lov_enqueue() */
2521                        }
2522
2523                        /* We already have a lock, and it's referenced.
2524                         *
2525                         * At this point, the cl_lock::cll_state is CLS_QUEUING,
2526                         * AGL upcall may change it to CLS_HELD directly. */
2527                        (*upcall)(cookie, ELDLM_OK);
2528
2529                        if (einfo->ei_mode != mode)
2530                                ldlm_lock_decref(lockh, LCK_PW);
2531                        else if (rqset)
2532                                /* For async requests, decref the lock. */
2533                                ldlm_lock_decref(lockh, einfo->ei_mode);
2534                        LDLM_LOCK_PUT(matched);
2535                        return ELDLM_OK;
2536                } else {
2537                        ldlm_lock_decref(lockh, mode);
2538                        LDLM_LOCK_PUT(matched);
2539                }
2540        }
2541
2542 no_match:
2543        if (intent) {
2544                LIST_HEAD(cancels);
2545                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2546                                           &RQF_LDLM_ENQUEUE_LVB);
2547                if (req == NULL)
2548                        return -ENOMEM;
2549
2550                rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2551                if (rc) {
2552                        ptlrpc_request_free(req);
2553                        return rc;
2554                }
2555
2556                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2557                                     sizeof(*lvb));
2558                ptlrpc_request_set_replen(req);
2559        }
2560
2561        /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2562        *flags &= ~LDLM_FL_BLOCK_GRANTED;
2563
2564        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2565                              sizeof(*lvb), LVB_T_OST, lockh, async);
2566        if (rqset) {
2567                if (!rc) {
2568                        struct osc_enqueue_args *aa;
2569                        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2570                        aa = ptlrpc_req_async_args(req);
2571                        aa->oa_ei = einfo;
2572                        aa->oa_exp = exp;
2573                        aa->oa_flags  = flags;
2574                        aa->oa_upcall = upcall;
2575                        aa->oa_cookie = cookie;
2576                        aa->oa_lvb    = lvb;
2577                        aa->oa_lockh  = lockh;
2578                        aa->oa_agl    = !!agl;
2579
2580                        req->rq_interpret_reply =
2581                                (ptlrpc_interpterer_t)osc_enqueue_interpret;
2582                        if (rqset == PTLRPCD_SET)
2583                                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2584                        else
2585                                ptlrpc_set_add_req(rqset, req);
2586                } else if (intent) {
2587                        ptlrpc_req_finished(req);
2588                }
2589                return rc;
2590        }
2591
2592        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2593        if (intent)
2594                ptlrpc_req_finished(req);
2595
2596        return rc;
2597}
2598
2599static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2600                       struct ldlm_enqueue_info *einfo,
2601                       struct ptlrpc_request_set *rqset)
2602{
2603        struct ldlm_res_id res_id;
2604        int rc;
2605
2606        ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2607        rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2608                              &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2609                              oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2610                              oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2611                              rqset, rqset != NULL, 0);
2612        return rc;
2613}
2614
2615int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2616                   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2617                   int *flags, void *data, struct lustre_handle *lockh,
2618                   int unref)
2619{
2620        struct obd_device *obd = exp->exp_obd;
2621        int lflags = *flags;
2622        ldlm_mode_t rc;
2623
2624        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2625                return -EIO;
2626
2627        /* Filesystem lock extents are extended to page boundaries so that
2628         * dealing with the page cache is a little smoother */
2629        policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2630        policy->l_extent.end |= ~CFS_PAGE_MASK;
2631
2632        /* Next, search for already existing extent locks that will cover us */
2633        /* If we're trying to read, we also search for an existing PW lock.  The
2634         * VFS and page cache already protect us locally, so lots of readers/
2635         * writers can share a single PW lock. */
2636        rc = mode;
2637        if (mode == LCK_PR)
2638                rc |= LCK_PW;
2639        rc = ldlm_lock_match(obd->obd_namespace, lflags,
2640                             res_id, type, policy, rc, lockh, unref);
2641        if (rc) {
2642                if (data != NULL) {
2643                        if (!osc_set_data_with_check(lockh, data)) {
2644                                if (!(lflags & LDLM_FL_TEST_LOCK))
2645                                        ldlm_lock_decref(lockh, rc);
2646                                return 0;
2647                        }
2648                }
2649                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2650                        ldlm_lock_addref(lockh, LCK_PR);
2651                        ldlm_lock_decref(lockh, LCK_PW);
2652                }
2653                return rc;
2654        }
2655        return rc;
2656}
2657
2658int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2659{
2660        if (unlikely(mode == LCK_GROUP))
2661                ldlm_lock_decref_and_cancel(lockh, mode);
2662        else
2663                ldlm_lock_decref(lockh, mode);
2664
2665        return 0;
2666}
2667
2668static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2669                      __u32 mode, struct lustre_handle *lockh)
2670{
2671        return osc_cancel_base(lockh, mode);
2672}
2673
2674static int osc_cancel_unused(struct obd_export *exp,
2675                             struct lov_stripe_md *lsm,
2676                             ldlm_cancel_flags_t flags,
2677                             void *opaque)
2678{
2679        struct obd_device *obd = class_exp2obd(exp);
2680        struct ldlm_res_id res_id, *resp = NULL;
2681
2682        if (lsm != NULL) {
2683                ostid_build_res_name(&lsm->lsm_oi, &res_id);
2684                resp = &res_id;
2685        }
2686
2687        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2688}
2689
2690static int osc_statfs_interpret(const struct lu_env *env,
2691                                struct ptlrpc_request *req,
2692                                struct osc_async_args *aa, int rc)
2693{
2694        struct obd_statfs *msfs;
2695
2696        if (rc == -EBADR)
2697                /* The request has in fact never been sent
2698                 * due to issues at a higher level (LOV).
2699                 * Exit immediately since the caller is
2700                 * aware of the problem and takes care
2701                 * of the clean up */
2702                 return rc;
2703
2704        if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2705            (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2706                GOTO(out, rc = 0);
2707
2708        if (rc != 0)
2709                GOTO(out, rc);
2710
2711        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712        if (msfs == NULL) {
2713                GOTO(out, rc = -EPROTO);
2714        }
2715
2716        *aa->aa_oi->oi_osfs = *msfs;
2717out:
2718        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719        return rc;
2720}
2721
2722static int osc_statfs_async(struct obd_export *exp,
2723                            struct obd_info *oinfo, __u64 max_age,
2724                            struct ptlrpc_request_set *rqset)
2725{
2726        struct obd_device     *obd = class_exp2obd(exp);
2727        struct ptlrpc_request *req;
2728        struct osc_async_args *aa;
2729        int                 rc;
2730
2731        /* We could possibly pass max_age in the request (as an absolute
2732         * timestamp or a "seconds.usec ago") so the target can avoid doing
2733         * extra calls into the filesystem if that isn't necessary (e.g.
2734         * during mount that would help a bit).  Having relative timestamps
2735         * is not so great if request processing is slow, while absolute
2736         * timestamps are not ideal because they need time synchronization. */
2737        req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2738        if (req == NULL)
2739                return -ENOMEM;
2740
2741        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2742        if (rc) {
2743                ptlrpc_request_free(req);
2744                return rc;
2745        }
2746        ptlrpc_request_set_replen(req);
2747        req->rq_request_portal = OST_CREATE_PORTAL;
2748        ptlrpc_at_set_req_timeout(req);
2749
2750        if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2751                /* procfs requests not want stat in wait for avoid deadlock */
2752                req->rq_no_resend = 1;
2753                req->rq_no_delay = 1;
2754        }
2755
2756        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2757        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2758        aa = ptlrpc_req_async_args(req);
2759        aa->aa_oi = oinfo;
2760
2761        ptlrpc_set_add_req(rqset, req);
2762        return 0;
2763}
2764
2765static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2766                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2767{
2768        struct obd_device     *obd = class_exp2obd(exp);
2769        struct obd_statfs     *msfs;
2770        struct ptlrpc_request *req;
2771        struct obd_import     *imp = NULL;
2772        int rc;
2773
2774        /*Since the request might also come from lprocfs, so we need
2775         *sync this with client_disconnect_export Bug15684*/
2776        down_read(&obd->u.cli.cl_sem);
2777        if (obd->u.cli.cl_import)
2778                imp = class_import_get(obd->u.cli.cl_import);
2779        up_read(&obd->u.cli.cl_sem);
2780        if (!imp)
2781                return -ENODEV;
2782
2783        /* We could possibly pass max_age in the request (as an absolute
2784         * timestamp or a "seconds.usec ago") so the target can avoid doing
2785         * extra calls into the filesystem if that isn't necessary (e.g.
2786         * during mount that would help a bit).  Having relative timestamps
2787         * is not so great if request processing is slow, while absolute
2788         * timestamps are not ideal because they need time synchronization. */
2789        req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2790
2791        class_import_put(imp);
2792
2793        if (req == NULL)
2794                return -ENOMEM;
2795
2796        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2797        if (rc) {
2798                ptlrpc_request_free(req);
2799                return rc;
2800        }
2801        ptlrpc_request_set_replen(req);
2802        req->rq_request_portal = OST_CREATE_PORTAL;
2803        ptlrpc_at_set_req_timeout(req);
2804
2805        if (flags & OBD_STATFS_NODELAY) {
2806                /* procfs requests not want stat in wait for avoid deadlock */
2807                req->rq_no_resend = 1;
2808                req->rq_no_delay = 1;
2809        }
2810
2811        rc = ptlrpc_queue_wait(req);
2812        if (rc)
2813                GOTO(out, rc);
2814
2815        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2816        if (msfs == NULL) {
2817                GOTO(out, rc = -EPROTO);
2818        }
2819
2820        *osfs = *msfs;
2821
2822 out:
2823        ptlrpc_req_finished(req);
2824        return rc;
2825}
2826
2827/* Retrieve object striping information.
2828 *
2829 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2830 * the maximum number of OST indices which will fit in the user buffer.
2831 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2832 */
2833static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2834{
2835        /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2836        struct lov_user_md_v3 lum, *lumk;
2837        struct lov_user_ost_data_v1 *lmm_objects;
2838        int rc = 0, lum_size;
2839
2840        if (!lsm)
2841                return -ENODATA;
2842
2843        /* we only need the header part from user space to get lmm_magic and
2844         * lmm_stripe_count, (the header part is common to v1 and v3) */
2845        lum_size = sizeof(struct lov_user_md_v1);
2846        if (copy_from_user(&lum, lump, lum_size))
2847                return -EFAULT;
2848
2849        if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2850            (lum.lmm_magic != LOV_USER_MAGIC_V3))
2851                return -EINVAL;
2852
2853        /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2854        LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2855        LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2856        LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2857
2858        /* we can use lov_mds_md_size() to compute lum_size
2859         * because lov_user_md_vX and lov_mds_md_vX have the same size */
2860        if (lum.lmm_stripe_count > 0) {
2861                lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2862                OBD_ALLOC(lumk, lum_size);
2863                if (!lumk)
2864                        return -ENOMEM;
2865
2866                if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2867                        lmm_objects =
2868                            &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2869                else
2870                        lmm_objects = &(lumk->lmm_objects[0]);
2871                lmm_objects->l_ost_oi = lsm->lsm_oi;
2872        } else {
2873                lum_size = lov_mds_md_size(0, lum.lmm_magic);
2874                lumk = &lum;
2875        }
2876
2877        lumk->lmm_oi = lsm->lsm_oi;
2878        lumk->lmm_stripe_count = 1;
2879
2880        if (copy_to_user(lump, lumk, lum_size))
2881                rc = -EFAULT;
2882
2883        if (lumk != &lum)
2884                OBD_FREE(lumk, lum_size);
2885
2886        return rc;
2887}
2888
2889
2890static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2891                         void *karg, void *uarg)
2892{
2893        struct obd_device *obd = exp->exp_obd;
2894        struct obd_ioctl_data *data = karg;
2895        int err = 0;
2896
2897        if (!try_module_get(THIS_MODULE)) {
2898                CERROR("Can't get module. Is it alive?");
2899                return -EINVAL;
2900        }
2901        switch (cmd) {
2902        case OBD_IOC_LOV_GET_CONFIG: {
2903                char *buf;
2904                struct lov_desc *desc;
2905                struct obd_uuid uuid;
2906
2907                buf = NULL;
2908                len = 0;
2909                if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2910                        GOTO(out, err = -EINVAL);
2911
2912                data = (struct obd_ioctl_data *)buf;
2913
2914                if (sizeof(*desc) > data->ioc_inllen1) {
2915                        obd_ioctl_freedata(buf, len);
2916                        GOTO(out, err = -EINVAL);
2917                }
2918
2919                if (data->ioc_inllen2 < sizeof(uuid)) {
2920                        obd_ioctl_freedata(buf, len);
2921                        GOTO(out, err = -EINVAL);
2922                }
2923
2924                desc = (struct lov_desc *)data->ioc_inlbuf1;
2925                desc->ld_tgt_count = 1;
2926                desc->ld_active_tgt_count = 1;
2927                desc->ld_default_stripe_count = 1;
2928                desc->ld_default_stripe_size = 0;
2929                desc->ld_default_stripe_offset = 0;
2930                desc->ld_pattern = 0;
2931                memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2932
2933                memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2934
2935                err = copy_to_user((void *)uarg, buf, len);
2936                if (err)
2937                        err = -EFAULT;
2938                obd_ioctl_freedata(buf, len);
2939                GOTO(out, err);
2940        }
2941        case LL_IOC_LOV_SETSTRIPE:
2942                err = obd_alloc_memmd(exp, karg);
2943                if (err > 0)
2944                        err = 0;
2945                GOTO(out, err);
2946        case LL_IOC_LOV_GETSTRIPE:
2947                err = osc_getstripe(karg, uarg);
2948                GOTO(out, err);
2949        case OBD_IOC_CLIENT_RECOVER:
2950                err = ptlrpc_recover_import(obd->u.cli.cl_import,
2951                                            data->ioc_inlbuf1, 0);
2952                if (err > 0)
2953                        err = 0;
2954                GOTO(out, err);
2955        case IOC_OSC_SET_ACTIVE:
2956                err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2957                                               data->ioc_offset);
2958                GOTO(out, err);
2959        case OBD_IOC_POLL_QUOTACHECK:
2960                err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2961                GOTO(out, err);
2962        case OBD_IOC_PING_TARGET:
2963                err = ptlrpc_obd_ping(obd);
2964                GOTO(out, err);
2965        default:
2966                CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2967                       cmd, current_comm());
2968                GOTO(out, err = -ENOTTY);
2969        }
2970out:
2971        module_put(THIS_MODULE);
2972        return err;
2973}
2974
2975static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2976                        obd_count keylen, void *key, __u32 *vallen, void *val,
2977                        struct lov_stripe_md *lsm)
2978{
2979        if (!vallen || !val)
2980                return -EFAULT;
2981
2982        if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2983                __u32 *stripe = val;
2984                *vallen = sizeof(*stripe);
2985                *stripe = 0;
2986                return 0;
2987        } else if (KEY_IS(KEY_LAST_ID)) {
2988                struct ptlrpc_request *req;
2989                obd_id          *reply;
2990                char              *tmp;
2991                int                 rc;
2992
2993                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2994                                           &RQF_OST_GET_INFO_LAST_ID);
2995                if (req == NULL)
2996                        return -ENOMEM;
2997
2998                req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2999                                     RCL_CLIENT, keylen);
3000                rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3001                if (rc) {
3002                        ptlrpc_request_free(req);
3003                        return rc;
3004                }
3005
3006                tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3007                memcpy(tmp, key, keylen);
3008
3009                req->rq_no_delay = req->rq_no_resend = 1;
3010                ptlrpc_request_set_replen(req);
3011                rc = ptlrpc_queue_wait(req);
3012                if (rc)
3013                        GOTO(out, rc);
3014
3015                reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3016                if (reply == NULL)
3017                        GOTO(out, rc = -EPROTO);
3018
3019                *((obd_id *)val) = *reply;
3020        out:
3021                ptlrpc_req_finished(req);
3022                return rc;
3023        } else if (KEY_IS(KEY_FIEMAP)) {
3024                struct ll_fiemap_info_key *fm_key =
3025                                (struct ll_fiemap_info_key *)key;
3026                struct ldlm_res_id       res_id;
3027                ldlm_policy_data_t       policy;
3028                struct lustre_handle     lockh;
3029                ldlm_mode_t              mode = 0;
3030                struct ptlrpc_request   *req;
3031                struct ll_user_fiemap   *reply;
3032                char                    *tmp;
3033                int                      rc;
3034
3035                if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3036                        goto skip_locking;
3037
3038                policy.l_extent.start = fm_key->fiemap.fm_start &
3039                                                CFS_PAGE_MASK;
3040
3041                if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3042                    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3043                        policy.l_extent.end = OBD_OBJECT_EOF;
3044                else
3045                        policy.l_extent.end = (fm_key->fiemap.fm_start +
3046                                fm_key->fiemap.fm_length +
3047                                PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3048
3049                ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3050                mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3051                                       LDLM_FL_BLOCK_GRANTED |
3052                                       LDLM_FL_LVB_READY,
3053                                       &res_id, LDLM_EXTENT, &policy,
3054                                       LCK_PR | LCK_PW, &lockh, 0);
3055                if (mode) { /* lock is cached on client */
3056                        if (mode != LCK_PR) {
3057                                ldlm_lock_addref(&lockh, LCK_PR);
3058                                ldlm_lock_decref(&lockh, LCK_PW);
3059                        }
3060                } else { /* no cached lock, needs acquire lock on server side */
3061                        fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3062                        fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3063                }
3064
3065skip_locking:
3066                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3067                                           &RQF_OST_GET_INFO_FIEMAP);
3068                if (req == NULL)
3069                        GOTO(drop_lock, rc = -ENOMEM);
3070
3071                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3072                                     RCL_CLIENT, keylen);
3073                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3074                                     RCL_CLIENT, *vallen);
3075                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3076                                     RCL_SERVER, *vallen);
3077
3078                rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3079                if (rc) {
3080                        ptlrpc_request_free(req);
3081                        GOTO(drop_lock, rc);
3082                }
3083
3084                tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3085                memcpy(tmp, key, keylen);
3086                tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087                memcpy(tmp, val, *vallen);
3088
3089                ptlrpc_request_set_replen(req);
3090                rc = ptlrpc_queue_wait(req);
3091                if (rc)
3092                        GOTO(fini_req, rc);
3093
3094                reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3095                if (reply == NULL)
3096                        GOTO(fini_req, rc = -EPROTO);
3097
3098                memcpy(val, reply, *vallen);
3099fini_req:
3100                ptlrpc_req_finished(req);
3101drop_lock:
3102                if (mode)
3103                        ldlm_lock_decref(&lockh, LCK_PR);
3104                return rc;
3105        }
3106
3107        return -EINVAL;
3108}
3109
3110static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3111                              obd_count keylen, void *key, obd_count vallen,
3112                              void *val, struct ptlrpc_request_set *set)
3113{
3114        struct ptlrpc_request *req;
3115        struct obd_device     *obd = exp->exp_obd;
3116        struct obd_import     *imp = class_exp2cliimp(exp);
3117        char              *tmp;
3118        int                 rc;
3119
3120        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3121
3122        if (KEY_IS(KEY_CHECKSUM)) {
3123                if (vallen != sizeof(int))
3124                        return -EINVAL;
3125                exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3126                return 0;
3127        }
3128
3129        if (KEY_IS(KEY_SPTLRPC_CONF)) {
3130                sptlrpc_conf_client_adapt(obd);
3131                return 0;
3132        }
3133
3134        if (KEY_IS(KEY_FLUSH_CTX)) {
3135                sptlrpc_import_flush_my_ctx(imp);
3136                return 0;
3137        }
3138
3139        if (KEY_IS(KEY_CACHE_SET)) {
3140                struct client_obd *cli = &obd->u.cli;
3141
3142                LASSERT(cli->cl_cache == NULL); /* only once */
3143                cli->cl_cache = (struct cl_client_cache *)val;
3144                atomic_inc(&cli->cl_cache->ccc_users);
3145                cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3146
3147                /* add this osc into entity list */
3148                LASSERT(list_empty(&cli->cl_lru_osc));
3149                spin_lock(&cli->cl_cache->ccc_lru_lock);
3150                list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3151                spin_unlock(&cli->cl_cache->ccc_lru_lock);
3152
3153                return 0;
3154        }
3155
3156        if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3157                struct client_obd *cli = &obd->u.cli;
3158                int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3159                int target = *(int *)val;
3160
3161                nr = osc_lru_shrink(cli, min(nr, target));
3162                *(int *)val -= nr;
3163                return 0;
3164        }
3165
3166        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167                return -EINVAL;
3168
3169        /* We pass all other commands directly to OST. Since nobody calls osc
3170           methods directly and everybody is supposed to go through LOV, we
3171           assume lov checked invalid values for us.
3172           The only recognised values so far are evict_by_nid and mds_conn.
3173           Even if something bad goes through, we'd get a -EINVAL from OST
3174           anyway. */
3175
3176        req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3177                                                &RQF_OST_SET_GRANT_INFO :
3178                                                &RQF_OBD_SET_INFO);
3179        if (req == NULL)
3180                return -ENOMEM;
3181
3182        req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3183                             RCL_CLIENT, keylen);
3184        if (!KEY_IS(KEY_GRANT_SHRINK))
3185                req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3186                                     RCL_CLIENT, vallen);
3187        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3188        if (rc) {
3189                ptlrpc_request_free(req);
3190                return rc;
3191        }
3192
3193        tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3194        memcpy(tmp, key, keylen);
3195        tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196                                                        &RMF_OST_BODY :
3197                                                        &RMF_SETINFO_VAL);
3198        memcpy(tmp, val, vallen);
3199
3200        if (KEY_IS(KEY_GRANT_SHRINK)) {
3201                struct osc_grant_args *aa;
3202                struct obdo *oa;
3203
3204                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3205                aa = ptlrpc_req_async_args(req);
3206                OBDO_ALLOC(oa);
3207                if (!oa) {
3208                        ptlrpc_req_finished(req);
3209                        return -ENOMEM;
3210                }
3211                *oa = ((struct ost_body *)val)->oa;
3212                aa->aa_oa = oa;
3213                req->rq_interpret_reply = osc_shrink_grant_interpret;
3214        }
3215
3216        ptlrpc_request_set_replen(req);
3217        if (!KEY_IS(KEY_GRANT_SHRINK)) {
3218                LASSERT(set != NULL);
3219                ptlrpc_set_add_req(set, req);
3220                ptlrpc_check_set(NULL, set);
3221        } else
3222                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3223
3224        return 0;
3225}
3226
3227
3228static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3229                         struct obd_device *disk_obd, int *index)
3230{
3231        /* this code is not supposed to be used with LOD/OSP
3232         * to be removed soon */
3233        LBUG();
3234        return 0;
3235}
3236
3237static int osc_llog_finish(struct obd_device *obd, int count)
3238{
3239        struct llog_ctxt *ctxt;
3240
3241        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3242        if (ctxt) {
3243                llog_cat_close(NULL, ctxt->loc_handle);
3244                llog_cleanup(NULL, ctxt);
3245        }
3246
3247        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3248        if (ctxt)
3249                llog_cleanup(NULL, ctxt);
3250        return 0;
3251}
3252
3253static int osc_reconnect(const struct lu_env *env,
3254                         struct obd_export *exp, struct obd_device *obd,
3255                         struct obd_uuid *cluuid,
3256                         struct obd_connect_data *data,
3257                         void *localdata)
3258{
3259        struct client_obd *cli = &obd->u.cli;
3260
3261        if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3262                long lost_grant;
3263
3264                client_obd_list_lock(&cli->cl_loi_list_lock);
3265                data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3266                                2 * cli_brw_size(obd);
3267                lost_grant = cli->cl_lost_grant;
3268                cli->cl_lost_grant = 0;
3269                client_obd_list_unlock(&cli->cl_loi_list_lock);
3270
3271                CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3272                       " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3273                       data->ocd_version, data->ocd_grant, lost_grant);
3274        }
3275
3276        return 0;
3277}
3278
3279static int osc_disconnect(struct obd_export *exp)
3280{
3281        struct obd_device *obd = class_exp2obd(exp);
3282        struct llog_ctxt  *ctxt;
3283        int rc;
3284
3285        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3286        if (ctxt) {
3287                if (obd->u.cli.cl_conn_count == 1) {
3288                        /* Flush any remaining cancel messages out to the
3289                         * target */
3290                        llog_sync(ctxt, exp, 0);
3291                }
3292                llog_ctxt_put(ctxt);
3293        } else {
3294                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3295                       obd);
3296        }
3297
3298        rc = client_disconnect_export(exp);
3299        /**
3300         * Initially we put del_shrink_grant before disconnect_export, but it
3301         * causes the following problem if setup (connect) and cleanup
3302         * (disconnect) are tangled together.
3303         *      connect p1                   disconnect p2
3304         *   ptlrpc_connect_import
3305         *     ...............         class_manual_cleanup
3306         *                                   osc_disconnect
3307         *                                   del_shrink_grant
3308         *   ptlrpc_connect_interrupt
3309         *     init_grant_shrink
3310         *   add this client to shrink list
3311         *                                    cleanup_osc
3312         * Bang! pinger trigger the shrink.
3313         * So the osc should be disconnected from the shrink list, after we
3314         * are sure the import has been destroyed. BUG18662
3315         */
3316        if (obd->u.cli.cl_import == NULL)
3317                osc_del_shrink_grant(&obd->u.cli);
3318        return rc;
3319}
3320
3321static int osc_import_event(struct obd_device *obd,
3322                            struct obd_import *imp,
3323                            enum obd_import_event event)
3324{
3325        struct client_obd *cli;
3326        int rc = 0;
3327
3328        LASSERT(imp->imp_obd == obd);
3329
3330        switch (event) {
3331        case IMP_EVENT_DISCON: {
3332                cli = &obd->u.cli;
3333                client_obd_list_lock(&cli->cl_loi_list_lock);
3334                cli->cl_avail_grant = 0;
3335                cli->cl_lost_grant = 0;
3336                client_obd_list_unlock(&cli->cl_loi_list_lock);
3337                break;
3338        }
3339        case IMP_EVENT_INACTIVE: {
3340                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3341                break;
3342        }
3343        case IMP_EVENT_INVALIDATE: {
3344                struct ldlm_namespace *ns = obd->obd_namespace;
3345                struct lu_env    *env;
3346                int                 refcheck;
3347
3348                env = cl_env_get(&refcheck);
3349                if (!IS_ERR(env)) {
3350                        /* Reset grants */
3351                        cli = &obd->u.cli;
3352                        /* all pages go to failing rpcs due to the invalid
3353                         * import */
3354                        osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3355
3356                        ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3357                        cl_env_put(env, &refcheck);
3358                } else
3359                        rc = PTR_ERR(env);
3360                break;
3361        }
3362        case IMP_EVENT_ACTIVE: {
3363                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3364                break;
3365        }
3366        case IMP_EVENT_OCD: {
3367                struct obd_connect_data *ocd = &imp->imp_connect_data;
3368
3369                if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3370                        osc_init_grant(&obd->u.cli, ocd);
3371
3372                /* See bug 7198 */
3373                if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3374                        imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3375
3376                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3377                break;
3378        }
3379        case IMP_EVENT_DEACTIVATE: {
3380                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3381                break;
3382        }
3383        case IMP_EVENT_ACTIVATE: {
3384                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3385                break;
3386        }
3387        default:
3388                CERROR("Unknown import event %d\n", event);
3389                LBUG();
3390        }
3391        return rc;
3392}
3393
3394/**
3395 * Determine whether the lock can be canceled before replaying the lock
3396 * during recovery, see bug16774 for detailed information.
3397 *
3398 * \retval zero the lock can't be canceled
3399 * \retval other ok to cancel
3400 */
3401static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3402{
3403        check_res_locked(lock->l_resource);
3404
3405        /*
3406         * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3407         *
3408         * XXX as a future improvement, we can also cancel unused write lock
3409         * if it doesn't have dirty data and active mmaps.
3410         */
3411        if (lock->l_resource->lr_type == LDLM_EXTENT &&
3412            (lock->l_granted_mode == LCK_PR ||
3413             lock->l_granted_mode == LCK_CR) &&
3414            (osc_dlm_lock_pageref(lock) == 0))
3415                return 1;
3416
3417        return 0;
3418}
3419
3420static int brw_queue_work(const struct lu_env *env, void *data)
3421{
3422        struct client_obd *cli = data;
3423
3424        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3425
3426        osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3427        return 0;
3428}
3429
3430int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3431{
3432        struct lprocfs_static_vars lvars = { 0 };
3433        struct client_obd         *cli = &obd->u.cli;
3434        void                   *handler;
3435        int                     rc;
3436
3437        rc = ptlrpcd_addref();
3438        if (rc)
3439                return rc;
3440
3441        rc = client_obd_setup(obd, lcfg);
3442        if (rc)
3443                GOTO(out_ptlrpcd, rc);
3444
3445        handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3446        if (IS_ERR(handler))
3447                GOTO(out_client_setup, rc = PTR_ERR(handler));
3448        cli->cl_writeback_work = handler;
3449
3450        rc = osc_quota_setup(obd);
3451        if (rc)
3452                GOTO(out_ptlrpcd_work, rc);
3453
3454        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3455        lprocfs_osc_init_vars(&lvars);
3456        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3457                lproc_osc_attach_seqstat(obd);
3458                sptlrpc_lprocfs_cliobd_attach(obd);
3459                ptlrpc_lprocfs_register_obd(obd);
3460        }
3461
3462        /* We need to allocate a few requests more, because
3463         * brw_interpret tries to create new requests before freeing
3464         * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3465         * reserved, but I'm afraid that might be too much wasted RAM
3466         * in fact, so 2 is just my guess and still should work. */
3467        cli->cl_import->imp_rq_pool =
3468                ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3469                                    OST_MAXREQSIZE,
3470                                    ptlrpc_add_rqs_to_pool);
3471
3472        INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3473        ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3474        return rc;
3475
3476out_ptlrpcd_work:
3477        ptlrpcd_destroy_work(handler);
3478out_client_setup:
3479        client_obd_cleanup(obd);
3480out_ptlrpcd:
3481        ptlrpcd_decref();
3482        return rc;
3483}
3484
3485static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3486{
3487        int rc = 0;
3488
3489        switch (stage) {
3490        case OBD_CLEANUP_EARLY: {
3491                struct obd_import *imp;
3492                imp = obd->u.cli.cl_import;
3493                CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3494                /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3495                ptlrpc_deactivate_import(imp);
3496                spin_lock(&imp->imp_lock);
3497                imp->imp_pingable = 0;
3498                spin_unlock(&imp->imp_lock);
3499                break;
3500        }
3501        case OBD_CLEANUP_EXPORTS: {
3502                struct client_obd *cli = &obd->u.cli;
3503                /* LU-464
3504                 * for echo client, export may be on zombie list, wait for
3505                 * zombie thread to cull it, because cli.cl_import will be
3506                 * cleared in client_disconnect_export():
3507                 *   class_export_destroy() -> obd_cleanup() ->
3508                 *   echo_device_free() -> echo_client_cleanup() ->
3509                 *   obd_disconnect() -> osc_disconnect() ->
3510                 *   client_disconnect_export()
3511                 */
3512                obd_zombie_barrier();
3513                if (cli->cl_writeback_work) {
3514                        ptlrpcd_destroy_work(cli->cl_writeback_work);
3515                        cli->cl_writeback_work = NULL;
3516                }
3517                obd_cleanup_client_import(obd);
3518                ptlrpc_lprocfs_unregister_obd(obd);
3519                lprocfs_obd_cleanup(obd);
3520                rc = obd_llog_finish(obd, 0);
3521                if (rc != 0)
3522                        CERROR("failed to cleanup llogging subsystems\n");
3523                break;
3524                }
3525        }
3526        return rc;
3527}
3528
3529int osc_cleanup(struct obd_device *obd)
3530{
3531        struct client_obd *cli = &obd->u.cli;
3532        int rc;
3533
3534        /* lru cleanup */
3535        if (cli->cl_cache != NULL) {
3536                LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3537                spin_lock(&cli->cl_cache->ccc_lru_lock);
3538                list_del_init(&cli->cl_lru_osc);
3539                spin_unlock(&cli->cl_cache->ccc_lru_lock);
3540                cli->cl_lru_left = NULL;
3541                atomic_dec(&cli->cl_cache->ccc_users);
3542                cli->cl_cache = NULL;
3543        }
3544
3545        /* free memory of osc quota cache */
3546        osc_quota_cleanup(obd);
3547
3548        rc = client_obd_cleanup(obd);
3549
3550        ptlrpcd_decref();
3551        return rc;
3552}
3553
3554int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3555{
3556        struct lprocfs_static_vars lvars = { 0 };
3557        int rc = 0;
3558
3559        lprocfs_osc_init_vars(&lvars);
3560
3561        switch (lcfg->lcfg_command) {
3562        default:
3563                rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3564                                              lcfg, obd);
3565                if (rc > 0)
3566                        rc = 0;
3567                break;
3568        }
3569
3570        return(rc);
3571}
3572
3573static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3574{
3575        return osc_process_config_base(obd, buf);
3576}
3577
3578struct obd_ops osc_obd_ops = {
3579        .o_owner                = THIS_MODULE,
3580        .o_setup                = osc_setup,
3581        .o_precleanup      = osc_precleanup,
3582        .o_cleanup            = osc_cleanup,
3583        .o_add_conn          = client_import_add_conn,
3584        .o_del_conn          = client_import_del_conn,
3585        .o_connect            = client_connect_import,
3586        .o_reconnect        = osc_reconnect,
3587        .o_disconnect      = osc_disconnect,
3588        .o_statfs              = osc_statfs,
3589        .o_statfs_async  = osc_statfs_async,
3590        .o_packmd              = osc_packmd,
3591        .o_unpackmd          = osc_unpackmd,
3592        .o_create              = osc_create,
3593        .o_destroy            = osc_destroy,
3594        .o_getattr            = osc_getattr,
3595        .o_getattr_async        = osc_getattr_async,
3596        .o_setattr            = osc_setattr,
3597        .o_setattr_async        = osc_setattr_async,
3598        .o_brw            = osc_brw,
3599        .o_punch                = osc_punch,
3600        .o_sync          = osc_sync,
3601        .o_enqueue            = osc_enqueue,
3602        .o_change_cbdata        = osc_change_cbdata,
3603        .o_find_cbdata    = osc_find_cbdata,
3604        .o_cancel              = osc_cancel,
3605        .o_cancel_unused        = osc_cancel_unused,
3606        .o_iocontrol        = osc_iocontrol,
3607        .o_get_info          = osc_get_info,
3608        .o_set_info_async       = osc_set_info_async,
3609        .o_import_event  = osc_import_event,
3610        .o_llog_init        = osc_llog_init,
3611        .o_llog_finish    = osc_llog_finish,
3612        .o_process_config       = osc_process_config,
3613        .o_quotactl          = osc_quotactl,
3614        .o_quotacheck      = osc_quotacheck,
3615};
3616
3617extern struct lu_kmem_descr osc_caches[];
3618extern spinlock_t osc_ast_guard;
3619extern struct lock_class_key osc_ast_guard_class;
3620
3621int __init osc_init(void)
3622{
3623        struct lprocfs_static_vars lvars = { 0 };
3624        int rc;
3625
3626        /* print an address of _any_ initialized kernel symbol from this
3627         * module, to allow debugging with gdb that doesn't support data
3628         * symbols from modules.*/
3629        CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3630
3631        rc = lu_kmem_init(osc_caches);
3632        if (rc)
3633                return rc;
3634
3635        lprocfs_osc_init_vars(&lvars);
3636
3637        rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3638                                 LUSTRE_OSC_NAME, &osc_device_type);
3639        if (rc) {
3640                lu_kmem_fini(osc_caches);
3641                return rc;
3642        }
3643
3644        spin_lock_init(&osc_ast_guard);
3645        lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3646
3647        return rc;
3648}
3649
3650static void /*__exit*/ osc_exit(void)
3651{
3652        class_unregister_type(LUSTRE_OSC_NAME);
3653        lu_kmem_fini(osc_caches);
3654}
3655
3656MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3657MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3658MODULE_LICENSE("GPL");
3659MODULE_VERSION(LUSTRE_VERSION_STRING);
3660
3661module_init(osc_init);
3662module_exit(osc_exit);
3663