linux/drivers/staging/lustre/lustre/osc/osc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_OSC
  38
  39#include "../../include/linux/libcfs/libcfs.h"
  40
  41
  42#include "../include/lustre_dlm.h"
  43#include "../include/lustre_net.h"
  44#include "../include/lustre/lustre_user.h"
  45#include "../include/obd_cksum.h"
  46#include "../include/obd_ost.h"
  47
  48#include "../include/lustre_ha.h"
  49#include "../include/lprocfs_status.h"
  50#include "../include/lustre_log.h"
  51#include "../include/lustre_debug.h"
  52#include "../include/lustre_param.h"
  53#include "../include/lustre_fid.h"
  54#include "osc_internal.h"
  55#include "osc_cl_internal.h"
  56
  57static void osc_release_ppga(struct brw_page **ppga, obd_count count);
  58static int brw_interpret(const struct lu_env *env,
  59                         struct ptlrpc_request *req, void *data, int rc);
  60int osc_cleanup(struct obd_device *obd);
  61
  62/* Pack OSC object metadata for disk storage (LE byte order). */
  63static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
  64                      struct lov_stripe_md *lsm)
  65{
  66        int lmm_size;
  67
  68        lmm_size = sizeof(**lmmp);
  69        if (lmmp == NULL)
  70                return lmm_size;
  71
  72        if (*lmmp != NULL && lsm == NULL) {
  73                OBD_FREE(*lmmp, lmm_size);
  74                *lmmp = NULL;
  75                return 0;
  76        } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
  77                return -EBADF;
  78        }
  79
  80        if (*lmmp == NULL) {
  81                OBD_ALLOC(*lmmp, lmm_size);
  82                if (*lmmp == NULL)
  83                        return -ENOMEM;
  84        }
  85
  86        if (lsm)
  87                ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
  88
  89        return lmm_size;
  90}
  91
  92/* Unpack OSC object metadata from disk storage (LE byte order). */
  93static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
  94                        struct lov_mds_md *lmm, int lmm_bytes)
  95{
  96        int lsm_size;
  97        struct obd_import *imp = class_exp2cliimp(exp);
  98
  99        if (lmm != NULL) {
 100                if (lmm_bytes < sizeof(*lmm)) {
 101                        CERROR("%s: lov_mds_md too small: %d, need %d\n",
 102                               exp->exp_obd->obd_name, lmm_bytes,
 103                               (int)sizeof(*lmm));
 104                        return -EINVAL;
 105                }
 106                /* XXX LOV_MAGIC etc check? */
 107
 108                if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
 109                        CERROR("%s: zero lmm_object_id: rc = %d\n",
 110                               exp->exp_obd->obd_name, -EINVAL);
 111                        return -EINVAL;
 112                }
 113        }
 114
 115        lsm_size = lov_stripe_md_size(1);
 116        if (lsmp == NULL)
 117                return lsm_size;
 118
 119        if (*lsmp != NULL && lmm == NULL) {
 120                OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 121                OBD_FREE(*lsmp, lsm_size);
 122                *lsmp = NULL;
 123                return 0;
 124        }
 125
 126        if (*lsmp == NULL) {
 127                OBD_ALLOC(*lsmp, lsm_size);
 128                if (unlikely(*lsmp == NULL))
 129                        return -ENOMEM;
 130                OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
 131                if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
 132                        OBD_FREE(*lsmp, lsm_size);
 133                        return -ENOMEM;
 134                }
 135                loi_init((*lsmp)->lsm_oinfo[0]);
 136        } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
 137                return -EBADF;
 138        }
 139
 140        if (lmm != NULL)
 141                /* XXX zero *lsmp? */
 142                ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
 143
 144        if (imp != NULL &&
 145            (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
 146                (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
 147        else
 148                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 149
 150        return lsm_size;
 151}
 152
 153static inline void osc_pack_capa(struct ptlrpc_request *req,
 154                                 struct ost_body *body, void *capa)
 155{
 156        struct obd_capa *oc = (struct obd_capa *)capa;
 157        struct lustre_capa *c;
 158
 159        if (!capa)
 160                return;
 161
 162        c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
 163        LASSERT(c);
 164        capa_cpy(c, oc);
 165        body->oa.o_valid |= OBD_MD_FLOSSCAPA;
 166        DEBUG_CAPA(D_SEC, c, "pack");
 167}
 168
 169static inline void osc_pack_req_body(struct ptlrpc_request *req,
 170                                     struct obd_info *oinfo)
 171{
 172        struct ost_body *body;
 173
 174        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 175        LASSERT(body);
 176
 177        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 178                             oinfo->oi_oa);
 179        osc_pack_capa(req, body, oinfo->oi_capa);
 180}
 181
 182static inline void osc_set_capa_size(struct ptlrpc_request *req,
 183                                     const struct req_msg_field *field,
 184                                     struct obd_capa *oc)
 185{
 186        if (oc == NULL)
 187                req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
 188        else
 189                /* it is already calculated as sizeof struct obd_capa */
 190                ;
 191}
 192
 193static int osc_getattr_interpret(const struct lu_env *env,
 194                                 struct ptlrpc_request *req,
 195                                 struct osc_async_args *aa, int rc)
 196{
 197        struct ost_body *body;
 198
 199        if (rc != 0)
 200                GOTO(out, rc);
 201
 202        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 203        if (body) {
 204                CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
 205                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
 206                                     aa->aa_oi->oi_oa, &body->oa);
 207
 208                /* This should really be sent by the OST */
 209                aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
 210                aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
 211        } else {
 212                CDEBUG(D_INFO, "can't unpack ost_body\n");
 213                rc = -EPROTO;
 214                aa->aa_oi->oi_oa->o_valid = 0;
 215        }
 216out:
 217        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
 218        return rc;
 219}
 220
 221static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
 222                             struct ptlrpc_request_set *set)
 223{
 224        struct ptlrpc_request *req;
 225        struct osc_async_args *aa;
 226        int                 rc;
 227
 228        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
 229        if (req == NULL)
 230                return -ENOMEM;
 231
 232        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 233        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
 234        if (rc) {
 235                ptlrpc_request_free(req);
 236                return rc;
 237        }
 238
 239        osc_pack_req_body(req, oinfo);
 240
 241        ptlrpc_request_set_replen(req);
 242        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
 243
 244        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
 245        aa = ptlrpc_req_async_args(req);
 246        aa->aa_oi = oinfo;
 247
 248        ptlrpc_set_add_req(set, req);
 249        return 0;
 250}
 251
 252static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
 253                       struct obd_info *oinfo)
 254{
 255        struct ptlrpc_request *req;
 256        struct ost_body       *body;
 257        int                 rc;
 258
 259        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
 260        if (req == NULL)
 261                return -ENOMEM;
 262
 263        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 264        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
 265        if (rc) {
 266                ptlrpc_request_free(req);
 267                return rc;
 268        }
 269
 270        osc_pack_req_body(req, oinfo);
 271
 272        ptlrpc_request_set_replen(req);
 273
 274        rc = ptlrpc_queue_wait(req);
 275        if (rc)
 276                GOTO(out, rc);
 277
 278        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 279        if (body == NULL)
 280                GOTO(out, rc = -EPROTO);
 281
 282        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
 283        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
 284                             &body->oa);
 285
 286        oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
 287        oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
 288
 289 out:
 290        ptlrpc_req_finished(req);
 291        return rc;
 292}
 293
 294static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
 295                       struct obd_info *oinfo, struct obd_trans_info *oti)
 296{
 297        struct ptlrpc_request *req;
 298        struct ost_body       *body;
 299        int                 rc;
 300
 301        LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
 302
 303        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
 304        if (req == NULL)
 305                return -ENOMEM;
 306
 307        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 308        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
 309        if (rc) {
 310                ptlrpc_request_free(req);
 311                return rc;
 312        }
 313
 314        osc_pack_req_body(req, oinfo);
 315
 316        ptlrpc_request_set_replen(req);
 317
 318        rc = ptlrpc_queue_wait(req);
 319        if (rc)
 320                GOTO(out, rc);
 321
 322        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 323        if (body == NULL)
 324                GOTO(out, rc = -EPROTO);
 325
 326        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
 327                             &body->oa);
 328
 329out:
 330        ptlrpc_req_finished(req);
 331        return rc;
 332}
 333
 334static int osc_setattr_interpret(const struct lu_env *env,
 335                                 struct ptlrpc_request *req,
 336                                 struct osc_setattr_args *sa, int rc)
 337{
 338        struct ost_body *body;
 339
 340        if (rc != 0)
 341                GOTO(out, rc);
 342
 343        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 344        if (body == NULL)
 345                GOTO(out, rc = -EPROTO);
 346
 347        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
 348                             &body->oa);
 349out:
 350        rc = sa->sa_upcall(sa->sa_cookie, rc);
 351        return rc;
 352}
 353
 354int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
 355                           struct obd_trans_info *oti,
 356                           obd_enqueue_update_f upcall, void *cookie,
 357                           struct ptlrpc_request_set *rqset)
 358{
 359        struct ptlrpc_request   *req;
 360        struct osc_setattr_args *sa;
 361        int                   rc;
 362
 363        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
 364        if (req == NULL)
 365                return -ENOMEM;
 366
 367        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 368        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
 369        if (rc) {
 370                ptlrpc_request_free(req);
 371                return rc;
 372        }
 373
 374        if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
 375                oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
 376
 377        osc_pack_req_body(req, oinfo);
 378
 379        ptlrpc_request_set_replen(req);
 380
 381        /* do mds to ost setattr asynchronously */
 382        if (!rqset) {
 383                /* Do not wait for response. */
 384                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 385        } else {
 386                req->rq_interpret_reply =
 387                        (ptlrpc_interpterer_t)osc_setattr_interpret;
 388
 389                CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
 390                sa = ptlrpc_req_async_args(req);
 391                sa->sa_oa = oinfo->oi_oa;
 392                sa->sa_upcall = upcall;
 393                sa->sa_cookie = cookie;
 394
 395                if (rqset == PTLRPCD_SET)
 396                        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 397                else
 398                        ptlrpc_set_add_req(rqset, req);
 399        }
 400
 401        return 0;
 402}
 403
 404static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
 405                             struct obd_trans_info *oti,
 406                             struct ptlrpc_request_set *rqset)
 407{
 408        return osc_setattr_async_base(exp, oinfo, oti,
 409                                      oinfo->oi_cb_up, oinfo, rqset);
 410}
 411
 412int osc_real_create(struct obd_export *exp, struct obdo *oa,
 413                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
 414{
 415        struct ptlrpc_request *req;
 416        struct ost_body       *body;
 417        struct lov_stripe_md  *lsm;
 418        int                 rc;
 419
 420        LASSERT(oa);
 421        LASSERT(ea);
 422
 423        lsm = *ea;
 424        if (!lsm) {
 425                rc = obd_alloc_memmd(exp, &lsm);
 426                if (rc < 0)
 427                        return rc;
 428        }
 429
 430        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
 431        if (req == NULL)
 432                GOTO(out, rc = -ENOMEM);
 433
 434        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
 435        if (rc) {
 436                ptlrpc_request_free(req);
 437                GOTO(out, rc);
 438        }
 439
 440        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 441        LASSERT(body);
 442
 443        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 444
 445        ptlrpc_request_set_replen(req);
 446
 447        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
 448            oa->o_flags == OBD_FL_DELORPHAN) {
 449                DEBUG_REQ(D_HA, req,
 450                          "delorphan from OST integration");
 451                /* Don't resend the delorphan req */
 452                req->rq_no_resend = req->rq_no_delay = 1;
 453        }
 454
 455        rc = ptlrpc_queue_wait(req);
 456        if (rc)
 457                GOTO(out_req, rc);
 458
 459        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 460        if (body == NULL)
 461                GOTO(out_req, rc = -EPROTO);
 462
 463        CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
 464        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
 465
 466        oa->o_blksize = cli_brw_size(exp->exp_obd);
 467        oa->o_valid |= OBD_MD_FLBLKSZ;
 468
 469        /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
 470         * have valid lsm_oinfo data structs, so don't go touching that.
 471         * This needs to be fixed in a big way.
 472         */
 473        lsm->lsm_oi = oa->o_oi;
 474        *ea = lsm;
 475
 476        if (oti != NULL) {
 477                oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
 478
 479                if (oa->o_valid & OBD_MD_FLCOOKIE) {
 480                        if (!oti->oti_logcookies)
 481                                oti_alloc_cookies(oti, 1);
 482                        *oti->oti_logcookies = oa->o_lcookie;
 483                }
 484        }
 485
 486        CDEBUG(D_HA, "transno: %lld\n",
 487               lustre_msg_get_transno(req->rq_repmsg));
 488out_req:
 489        ptlrpc_req_finished(req);
 490out:
 491        if (rc && !*ea)
 492                obd_free_memmd(exp, &lsm);
 493        return rc;
 494}
 495
 496int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
 497                   obd_enqueue_update_f upcall, void *cookie,
 498                   struct ptlrpc_request_set *rqset)
 499{
 500        struct ptlrpc_request   *req;
 501        struct osc_setattr_args *sa;
 502        struct ost_body  *body;
 503        int                   rc;
 504
 505        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
 506        if (req == NULL)
 507                return -ENOMEM;
 508
 509        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 510        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
 511        if (rc) {
 512                ptlrpc_request_free(req);
 513                return rc;
 514        }
 515        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 516        ptlrpc_at_set_req_timeout(req);
 517
 518        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 519        LASSERT(body);
 520        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 521                             oinfo->oi_oa);
 522        osc_pack_capa(req, body, oinfo->oi_capa);
 523
 524        ptlrpc_request_set_replen(req);
 525
 526        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
 527        CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
 528        sa = ptlrpc_req_async_args(req);
 529        sa->sa_oa     = oinfo->oi_oa;
 530        sa->sa_upcall = upcall;
 531        sa->sa_cookie = cookie;
 532        if (rqset == PTLRPCD_SET)
 533                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 534        else
 535                ptlrpc_set_add_req(rqset, req);
 536
 537        return 0;
 538}
 539
 540static int osc_punch(const struct lu_env *env, struct obd_export *exp,
 541                     struct obd_info *oinfo, struct obd_trans_info *oti,
 542                     struct ptlrpc_request_set *rqset)
 543{
 544        oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
 545        oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
 546        oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
 547        return osc_punch_base(exp, oinfo,
 548                              oinfo->oi_cb_up, oinfo, rqset);
 549}
 550
 551static int osc_sync_interpret(const struct lu_env *env,
 552                              struct ptlrpc_request *req,
 553                              void *arg, int rc)
 554{
 555        struct osc_fsync_args *fa = arg;
 556        struct ost_body *body;
 557
 558        if (rc)
 559                GOTO(out, rc);
 560
 561        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 562        if (body == NULL) {
 563                CERROR ("can't unpack ost_body\n");
 564                GOTO(out, rc = -EPROTO);
 565        }
 566
 567        *fa->fa_oi->oi_oa = body->oa;
 568out:
 569        rc = fa->fa_upcall(fa->fa_cookie, rc);
 570        return rc;
 571}
 572
 573int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
 574                  obd_enqueue_update_f upcall, void *cookie,
 575                  struct ptlrpc_request_set *rqset)
 576{
 577        struct ptlrpc_request *req;
 578        struct ost_body       *body;
 579        struct osc_fsync_args *fa;
 580        int                 rc;
 581
 582        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
 583        if (req == NULL)
 584                return -ENOMEM;
 585
 586        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
 587        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
 588        if (rc) {
 589                ptlrpc_request_free(req);
 590                return rc;
 591        }
 592
 593        /* overload the size and blocks fields in the oa with start/end */
 594        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 595        LASSERT(body);
 596        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
 597                             oinfo->oi_oa);
 598        osc_pack_capa(req, body, oinfo->oi_capa);
 599
 600        ptlrpc_request_set_replen(req);
 601        req->rq_interpret_reply = osc_sync_interpret;
 602
 603        CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
 604        fa = ptlrpc_req_async_args(req);
 605        fa->fa_oi = oinfo;
 606        fa->fa_upcall = upcall;
 607        fa->fa_cookie = cookie;
 608
 609        if (rqset == PTLRPCD_SET)
 610                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 611        else
 612                ptlrpc_set_add_req(rqset, req);
 613
 614        return 0;
 615}
 616
 617static int osc_sync(const struct lu_env *env, struct obd_export *exp,
 618                    struct obd_info *oinfo, obd_size start, obd_size end,
 619                    struct ptlrpc_request_set *set)
 620{
 621        if (!oinfo->oi_oa) {
 622                CDEBUG(D_INFO, "oa NULL\n");
 623                return -EINVAL;
 624        }
 625
 626        oinfo->oi_oa->o_size = start;
 627        oinfo->oi_oa->o_blocks = end;
 628        oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 629
 630        return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
 631}
 632
 633/* Find and cancel locally locks matched by @mode in the resource found by
 634 * @objid. Found locks are added into @cancel list. Returns the amount of
 635 * locks added to @cancels list. */
 636static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
 637                                   struct list_head *cancels,
 638                                   ldlm_mode_t mode, __u64 lock_flags)
 639{
 640        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
 641        struct ldlm_res_id res_id;
 642        struct ldlm_resource *res;
 643        int count;
 644
 645        /* Return, i.e. cancel nothing, only if ELC is supported (flag in
 646         * export) but disabled through procfs (flag in NS).
 647         *
 648         * This distinguishes from a case when ELC is not supported originally,
 649         * when we still want to cancel locks in advance and just cancel them
 650         * locally, without sending any RPC. */
 651        if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
 652                return 0;
 653
 654        ostid_build_res_name(&oa->o_oi, &res_id);
 655        res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
 656        if (res == NULL)
 657                return 0;
 658
 659        LDLM_RESOURCE_ADDREF(res);
 660        count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
 661                                           lock_flags, 0, NULL);
 662        LDLM_RESOURCE_DELREF(res);
 663        ldlm_resource_putref(res);
 664        return count;
 665}
 666
 667static int osc_destroy_interpret(const struct lu_env *env,
 668                                 struct ptlrpc_request *req, void *data,
 669                                 int rc)
 670{
 671        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
 672
 673        atomic_dec(&cli->cl_destroy_in_flight);
 674        wake_up(&cli->cl_destroy_waitq);
 675        return 0;
 676}
 677
 678static int osc_can_send_destroy(struct client_obd *cli)
 679{
 680        if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
 681            cli->cl_max_rpcs_in_flight) {
 682                /* The destroy request can be sent */
 683                return 1;
 684        }
 685        if (atomic_dec_return(&cli->cl_destroy_in_flight) <
 686            cli->cl_max_rpcs_in_flight) {
 687                /*
 688                 * The counter has been modified between the two atomic
 689                 * operations.
 690                 */
 691                wake_up(&cli->cl_destroy_waitq);
 692        }
 693        return 0;
 694}
 695
 696int osc_create(const struct lu_env *env, struct obd_export *exp,
 697               struct obdo *oa, struct lov_stripe_md **ea,
 698               struct obd_trans_info *oti)
 699{
 700        int rc = 0;
 701
 702        LASSERT(oa);
 703        LASSERT(ea);
 704        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 705
 706        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
 707            oa->o_flags == OBD_FL_RECREATE_OBJS) {
 708                return osc_real_create(exp, oa, ea, oti);
 709        }
 710
 711        if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
 712                return osc_real_create(exp, oa, ea, oti);
 713
 714        /* we should not get here anymore */
 715        LBUG();
 716
 717        return rc;
 718}
 719
 720/* Destroy requests can be async always on the client, and we don't even really
 721 * care about the return code since the client cannot do anything at all about
 722 * a destroy failure.
 723 * When the MDS is unlinking a filename, it saves the file objects into a
 724 * recovery llog, and these object records are cancelled when the OST reports
 725 * they were destroyed and sync'd to disk (i.e. transaction committed).
 726 * If the client dies, or the OST is down when the object should be destroyed,
 727 * the records are not cancelled, and when the OST reconnects to the MDS next,
 728 * it will retrieve the llog unlink logs and then sends the log cancellation
 729 * cookies to the MDS after committing destroy transactions. */
 730static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
 731                       struct obdo *oa, struct lov_stripe_md *ea,
 732                       struct obd_trans_info *oti, struct obd_export *md_export,
 733                       void *capa)
 734{
 735        struct client_obd     *cli = &exp->exp_obd->u.cli;
 736        struct ptlrpc_request *req;
 737        struct ost_body       *body;
 738        LIST_HEAD(cancels);
 739        int rc, count;
 740
 741        if (!oa) {
 742                CDEBUG(D_INFO, "oa NULL\n");
 743                return -EINVAL;
 744        }
 745
 746        count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
 747                                        LDLM_FL_DISCARD_DATA);
 748
 749        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
 750        if (req == NULL) {
 751                ldlm_lock_list_put(&cancels, l_bl_ast, count);
 752                return -ENOMEM;
 753        }
 754
 755        osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
 756        rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
 757                               0, &cancels, count);
 758        if (rc) {
 759                ptlrpc_request_free(req);
 760                return rc;
 761        }
 762
 763        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 764        ptlrpc_at_set_req_timeout(req);
 765
 766        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
 767                oa->o_lcookie = *oti->oti_logcookies;
 768        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
 769        LASSERT(body);
 770        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 771
 772        osc_pack_capa(req, body, (struct obd_capa *)capa);
 773        ptlrpc_request_set_replen(req);
 774
 775        /* If osc_destroy is for destroying the unlink orphan,
 776         * sent from MDT to OST, which should not be blocked here,
 777         * because the process might be triggered by ptlrpcd, and
 778         * it is not good to block ptlrpcd thread (b=16006)*/
 779        if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
 780                req->rq_interpret_reply = osc_destroy_interpret;
 781                if (!osc_can_send_destroy(cli)) {
 782                        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
 783                                                          NULL);
 784
 785                        /*
 786                         * Wait until the number of on-going destroy RPCs drops
 787                         * under max_rpc_in_flight
 788                         */
 789                        l_wait_event_exclusive(cli->cl_destroy_waitq,
 790                                               osc_can_send_destroy(cli), &lwi);
 791                }
 792        }
 793
 794        /* Do not wait for response */
 795        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 796        return 0;
 797}
 798
 799static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 800                                long writing_bytes)
 801{
 802        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
 803
 804        LASSERT(!(oa->o_valid & bits));
 805
 806        oa->o_valid |= bits;
 807        client_obd_list_lock(&cli->cl_loi_list_lock);
 808        oa->o_dirty = cli->cl_dirty;
 809        if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
 810                     cli->cl_dirty_max)) {
 811                CERROR("dirty %lu - %lu > dirty_max %lu\n",
 812                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
 813                oa->o_undirty = 0;
 814        } else if (unlikely(atomic_read(&obd_dirty_pages) -
 815                            atomic_read(&obd_dirty_transit_pages) >
 816                            (long)(obd_max_dirty_pages + 1))) {
 817                /* The atomic_read() allowing the atomic_inc() are
 818                 * not covered by a lock thus they may safely race and trip
 819                 * this CERROR() unless we add in a small fudge factor (+1). */
 820                CERROR("dirty %d - %d > system dirty_max %d\n",
 821                       atomic_read(&obd_dirty_pages),
 822                       atomic_read(&obd_dirty_transit_pages),
 823                       obd_max_dirty_pages);
 824                oa->o_undirty = 0;
 825        } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
 826                CERROR("dirty %lu - dirty_max %lu too big???\n",
 827                       cli->cl_dirty, cli->cl_dirty_max);
 828                oa->o_undirty = 0;
 829        } else {
 830                long max_in_flight = (cli->cl_max_pages_per_rpc <<
 831                                      PAGE_CACHE_SHIFT)*
 832                                     (cli->cl_max_rpcs_in_flight + 1);
 833                oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
 834        }
 835        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
 836        oa->o_dropped = cli->cl_lost_grant;
 837        cli->cl_lost_grant = 0;
 838        client_obd_list_unlock(&cli->cl_loi_list_lock);
 839        CDEBUG(D_CACHE,"dirty: %llu undirty: %u dropped %u grant: %llu\n",
 840               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
 841
 842}
 843
 844void osc_update_next_shrink(struct client_obd *cli)
 845{
 846        cli->cl_next_shrink_grant =
 847                cfs_time_shift(cli->cl_grant_shrink_interval);
 848        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
 849               cli->cl_next_shrink_grant);
 850}
 851
 852static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 853{
 854        client_obd_list_lock(&cli->cl_loi_list_lock);
 855        cli->cl_avail_grant += grant;
 856        client_obd_list_unlock(&cli->cl_loi_list_lock);
 857}
 858
 859static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 860{
 861        if (body->oa.o_valid & OBD_MD_FLGRANT) {
 862                CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
 863                __osc_update_grant(cli, body->oa.o_grant);
 864        }
 865}
 866
 867static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 868                              obd_count keylen, void *key, obd_count vallen,
 869                              void *val, struct ptlrpc_request_set *set);
 870
 871static int osc_shrink_grant_interpret(const struct lu_env *env,
 872                                      struct ptlrpc_request *req,
 873                                      void *aa, int rc)
 874{
 875        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
 876        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
 877        struct ost_body *body;
 878
 879        if (rc != 0) {
 880                __osc_update_grant(cli, oa->o_grant);
 881                GOTO(out, rc);
 882        }
 883
 884        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
 885        LASSERT(body);
 886        osc_update_grant(cli, body);
 887out:
 888        OBDO_FREE(oa);
 889        return rc;
 890}
 891
 892static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 893{
 894        client_obd_list_lock(&cli->cl_loi_list_lock);
 895        oa->o_grant = cli->cl_avail_grant / 4;
 896        cli->cl_avail_grant -= oa->o_grant;
 897        client_obd_list_unlock(&cli->cl_loi_list_lock);
 898        if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
 899                oa->o_valid |= OBD_MD_FLFLAGS;
 900                oa->o_flags = 0;
 901        }
 902        oa->o_flags |= OBD_FL_SHRINK_GRANT;
 903        osc_update_next_shrink(cli);
 904}
 905
 906/* Shrink the current grant, either from some large amount to enough for a
 907 * full set of in-flight RPCs, or if we have already shrunk to that limit
 908 * then to enough for a single RPC.  This avoids keeping more grant than
 909 * needed, and avoids shrinking the grant piecemeal. */
 910static int osc_shrink_grant(struct client_obd *cli)
 911{
 912        __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
 913                             (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
 914
 915        client_obd_list_lock(&cli->cl_loi_list_lock);
 916        if (cli->cl_avail_grant <= target_bytes)
 917                target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 918        client_obd_list_unlock(&cli->cl_loi_list_lock);
 919
 920        return osc_shrink_grant_to_target(cli, target_bytes);
 921}
 922
 923int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
 924{
 925        int                     rc = 0;
 926        struct ost_body *body;
 927
 928        client_obd_list_lock(&cli->cl_loi_list_lock);
 929        /* Don't shrink if we are already above or below the desired limit
 930         * We don't want to shrink below a single RPC, as that will negatively
 931         * impact block allocation and long-term performance. */
 932        if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
 933                target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 934
 935        if (target_bytes >= cli->cl_avail_grant) {
 936                client_obd_list_unlock(&cli->cl_loi_list_lock);
 937                return 0;
 938        }
 939        client_obd_list_unlock(&cli->cl_loi_list_lock);
 940
 941        OBD_ALLOC_PTR(body);
 942        if (!body)
 943                return -ENOMEM;
 944
 945        osc_announce_cached(cli, &body->oa, 0);
 946
 947        client_obd_list_lock(&cli->cl_loi_list_lock);
 948        body->oa.o_grant = cli->cl_avail_grant - target_bytes;
 949        cli->cl_avail_grant = target_bytes;
 950        client_obd_list_unlock(&cli->cl_loi_list_lock);
 951        if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
 952                body->oa.o_valid |= OBD_MD_FLFLAGS;
 953                body->oa.o_flags = 0;
 954        }
 955        body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
 956        osc_update_next_shrink(cli);
 957
 958        rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
 959                                sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
 960                                sizeof(*body), body, NULL);
 961        if (rc != 0)
 962                __osc_update_grant(cli, body->oa.o_grant);
 963        OBD_FREE_PTR(body);
 964        return rc;
 965}
 966
 967static int osc_should_shrink_grant(struct client_obd *client)
 968{
 969        unsigned long time = cfs_time_current();
 970        unsigned long next_shrink = client->cl_next_shrink_grant;
 971
 972        if ((client->cl_import->imp_connect_data.ocd_connect_flags &
 973             OBD_CONNECT_GRANT_SHRINK) == 0)
 974                return 0;
 975
 976        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
 977                /* Get the current RPC size directly, instead of going via:
 978                 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
 979                 * Keep comment here so that it can be found by searching. */
 980                int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
 981
 982                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
 983                    client->cl_avail_grant > brw_size)
 984                        return 1;
 985                else
 986                        osc_update_next_shrink(client);
 987        }
 988        return 0;
 989}
 990
 991static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
 992{
 993        struct client_obd *client;
 994
 995        list_for_each_entry(client, &item->ti_obd_list,
 996                                cl_grant_shrink_list) {
 997                if (osc_should_shrink_grant(client))
 998                        osc_shrink_grant(client);
 999        }
1000        return 0;
1001}
1002
1003static int osc_add_shrink_grant(struct client_obd *client)
1004{
1005        int rc;
1006
1007        rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1008                                       TIMEOUT_GRANT,
1009                                       osc_grant_shrink_grant_cb, NULL,
1010                                       &client->cl_grant_shrink_list);
1011        if (rc) {
1012                CERROR("add grant client %s error %d\n",
1013                        client->cl_import->imp_obd->obd_name, rc);
1014                return rc;
1015        }
1016        CDEBUG(D_CACHE, "add grant client %s \n",
1017               client->cl_import->imp_obd->obd_name);
1018        osc_update_next_shrink(client);
1019        return 0;
1020}
1021
1022static int osc_del_shrink_grant(struct client_obd *client)
1023{
1024        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1025                                         TIMEOUT_GRANT);
1026}
1027
1028static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029{
1030        /*
1031         * ocd_grant is the total grant amount we're expect to hold: if we've
1032         * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033         * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1034         *
1035         * race is tolerable here: if we're evicted, but imp_state already
1036         * left EVICTED state, then cl_dirty must be 0 already.
1037         */
1038        client_obd_list_lock(&cli->cl_loi_list_lock);
1039        if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1040                cli->cl_avail_grant = ocd->ocd_grant;
1041        else
1042                cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1043
1044        if (cli->cl_avail_grant < 0) {
1045                CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046                      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047                      ocd->ocd_grant, cli->cl_dirty);
1048                /* workaround for servers which do not have the patch from
1049                 * LU-2679 */
1050                cli->cl_avail_grant = ocd->ocd_grant;
1051        }
1052
1053        /* determine the appropriate chunk size used by osc_extent. */
1054        cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055        client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057        CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058                "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062            list_empty(&cli->cl_grant_shrink_list))
1063                osc_add_shrink_grant(cli);
1064}
1065
1066/* We assume that the reason this OSC got a short read is because it read
1067 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069 * this stripe never got written at or beyond this stripe offset yet. */
1070static void handle_short_read(int nob_read, obd_count page_count,
1071                              struct brw_page **pga)
1072{
1073        char *ptr;
1074        int i = 0;
1075
1076        /* skip bytes read OK */
1077        while (nob_read > 0) {
1078                LASSERT (page_count > 0);
1079
1080                if (pga[i]->count > nob_read) {
1081                        /* EOF inside this page */
1082                        ptr = kmap(pga[i]->pg) +
1083                                (pga[i]->off & ~CFS_PAGE_MASK);
1084                        memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085                        kunmap(pga[i]->pg);
1086                        page_count--;
1087                        i++;
1088                        break;
1089                }
1090
1091                nob_read -= pga[i]->count;
1092                page_count--;
1093                i++;
1094        }
1095
1096        /* zero remaining pages */
1097        while (page_count-- > 0) {
1098                ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099                memset(ptr, 0, pga[i]->count);
1100                kunmap(pga[i]->pg);
1101                i++;
1102        }
1103}
1104
1105static int check_write_rcs(struct ptlrpc_request *req,
1106                           int requested_nob, int niocount,
1107                           obd_count page_count, struct brw_page **pga)
1108{
1109        int     i;
1110        __u32   *remote_rcs;
1111
1112        remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113                                                  sizeof(*remote_rcs) *
1114                                                  niocount);
1115        if (remote_rcs == NULL) {
1116                CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117                return(-EPROTO);
1118        }
1119
1120        /* return error if any niobuf was in error */
1121        for (i = 0; i < niocount; i++) {
1122                if ((int)remote_rcs[i] < 0)
1123                        return(remote_rcs[i]);
1124
1125                if (remote_rcs[i] != 0) {
1126                        CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127                                i, remote_rcs[i], req);
1128                        return(-EPROTO);
1129                }
1130        }
1131
1132        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133                CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134                       req->rq_bulk->bd_nob_transferred, requested_nob);
1135                return(-EPROTO);
1136        }
1137
1138        return (0);
1139}
1140
1141static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142{
1143        if (p1->flag != p2->flag) {
1144                unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1145                                  OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1146
1147                /* warn if we try to combine flags that we don't know to be
1148                 * safe to combine */
1149                if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1150                        CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1151                              "report this at http://bugs.whamcloud.com/\n",
1152                              p1->flag, p2->flag);
1153                }
1154                return 0;
1155        }
1156
1157        return (p1->off + p1->count == p2->off);
1158}
1159
1160static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1161                                   struct brw_page **pga, int opc,
1162                                   cksum_type_t cksum_type)
1163{
1164        __u32                           cksum;
1165        int                             i = 0;
1166        struct cfs_crypto_hash_desc     *hdesc;
1167        unsigned int                    bufsize;
1168        int                             err;
1169        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1170
1171        LASSERT(pg_count > 0);
1172
1173        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1174        if (IS_ERR(hdesc)) {
1175                CERROR("Unable to initialize checksum hash %s\n",
1176                       cfs_crypto_hash_name(cfs_alg));
1177                return PTR_ERR(hdesc);
1178        }
1179
1180        while (nob > 0 && pg_count > 0) {
1181                int count = pga[i]->count > nob ? nob : pga[i]->count;
1182
1183                /* corrupt the data before we compute the checksum, to
1184                 * simulate an OST->client data error */
1185                if (i == 0 && opc == OST_READ &&
1186                    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1187                        unsigned char *ptr = kmap(pga[i]->pg);
1188                        int off = pga[i]->off & ~CFS_PAGE_MASK;
1189                        memcpy(ptr + off, "bad1", min(4, nob));
1190                        kunmap(pga[i]->pg);
1191                }
1192                cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1193                                  pga[i]->off & ~CFS_PAGE_MASK,
1194                                  count);
1195                CDEBUG(D_PAGE,
1196                       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1197                       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1198                       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1199                       page_private(pga[i]->pg),
1200                       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1201
1202                nob -= pga[i]->count;
1203                pg_count--;
1204                i++;
1205        }
1206
1207        bufsize = 4;
1208        err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1209
1210        if (err)
1211                cfs_crypto_hash_final(hdesc, NULL, NULL);
1212
1213        /* For sending we only compute the wrong checksum instead
1214         * of corrupting the data so it is still correct on a redo */
1215        if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                cksum++;
1217
1218        return cksum;
1219}
1220
1221static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                struct lov_stripe_md *lsm, obd_count page_count,
1223                                struct brw_page **pga,
1224                                struct ptlrpc_request **reqp,
1225                                struct obd_capa *ocapa, int reserve,
1226                                int resend)
1227{
1228        struct ptlrpc_request   *req;
1229        struct ptlrpc_bulk_desc *desc;
1230        struct ost_body  *body;
1231        struct obd_ioobj        *ioobj;
1232        struct niobuf_remote    *niobuf;
1233        int niocount, i, requested_nob, opc, rc;
1234        struct osc_brw_async_args *aa;
1235        struct req_capsule      *pill;
1236        struct brw_page *pg_prev;
1237
1238        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                return -ENOMEM; /* Recoverable */
1240        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                return -EINVAL; /* Fatal */
1242
1243        if ((cmd & OBD_BRW_WRITE) != 0) {
1244                opc = OST_WRITE;
1245                req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                cli->cl_import->imp_rq_pool,
1247                                                &RQF_OST_BRW_WRITE);
1248        } else {
1249                opc = OST_READ;
1250                req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251        }
1252        if (req == NULL)
1253                return -ENOMEM;
1254
1255        for (niocount = i = 1; i < page_count; i++) {
1256                if (!can_merge_pages(pga[i - 1], pga[i]))
1257                        niocount++;
1258        }
1259
1260        pill = &req->rq_pill;
1261        req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262                             sizeof(*ioobj));
1263        req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264                             niocount * sizeof(*niobuf));
1265        osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268        if (rc) {
1269                ptlrpc_request_free(req);
1270                return rc;
1271        }
1272        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1273        ptlrpc_at_set_req_timeout(req);
1274        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1275         * retry logic */
1276        req->rq_no_retry_einprogress = 1;
1277
1278        desc = ptlrpc_prep_bulk_imp(req, page_count,
1279                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280                opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281                OST_BULK_PORTAL);
1282
1283        if (desc == NULL)
1284                GOTO(out, rc = -ENOMEM);
1285        /* NB request now owns desc and will free it when it gets freed */
1286
1287        body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288        ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289        niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290        LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294        obdo_to_ioobj(oa, ioobj);
1295        ioobj->ioo_bufcnt = niocount;
1296        /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1297         * that might be send for this request.  The actual number is decided
1298         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1299         * "max - 1" for old client compatibility sending "0", and also so the
1300         * the actual maximum is a power-of-two number, not one less. LU-1431 */
1301        ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302        osc_pack_capa(req, body, ocapa);
1303        LASSERT(page_count > 0);
1304        pg_prev = pga[0];
1305        for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306                struct brw_page *pg = pga[i];
1307                int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309                LASSERT(pg->count > 0);
1310                /* make sure there is no gap in the middle of page array */
1311                LASSERTF(page_count == 1 ||
1312                         (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313                          ergo(i > 0 && i < page_count - 1,
1314                               poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1315                          ergo(i == page_count - 1, poff == 0)),
1316                         "i: %d/%d pg: %p off: %llu, count: %u\n",
1317                         i, page_count, pg, pg->off, pg->count);
1318                LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                         "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1320                         " prev_pg %p [pri %lu ind %lu] off %llu\n",
1321                         i, page_count,
1322                         pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323                         pg_prev->pg, page_private(pg_prev->pg),
1324                         pg_prev->pg->index, pg_prev->off);
1325                LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326                        (pg->flag & OBD_BRW_SRVLOCK));
1327
1328                ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329                requested_nob += pg->count;
1330
1331                if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332                        niobuf--;
1333                        niobuf->len += pg->count;
1334                } else {
1335                        niobuf->offset = pg->off;
1336                        niobuf->len    = pg->count;
1337                        niobuf->flags  = pg->flag;
1338                }
1339                pg_prev = pg;
1340        }
1341
1342        LASSERTF((void *)(niobuf - niocount) ==
1343                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344                "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345                &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347        osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348        if (resend) {
1349                if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350                        body->oa.o_valid |= OBD_MD_FLFLAGS;
1351                        body->oa.o_flags = 0;
1352                }
1353                body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354        }
1355
1356        if (osc_should_shrink_grant(cli))
1357                osc_shrink_grant_local(cli, &body->oa);
1358
1359        /* size[REQ_REC_OFF] still sizeof (*body) */
1360        if (opc == OST_WRITE) {
1361                if (cli->cl_checksum &&
1362                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363                        /* store cl_cksum_type in a local variable since
1364                         * it can be changed via lprocfs */
1365                        cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368                                oa->o_flags &= OBD_FL_LOCAL_MASK;
1369                                body->oa.o_flags = 0;
1370                        }
1371                        body->oa.o_flags |= cksum_type_pack(cksum_type);
1372                        body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373                        body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374                                                             page_count, pga,
1375                                                             OST_WRITE,
1376                                                             cksum_type);
1377                        CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378                               body->oa.o_cksum);
1379                        /* save this in 'oa', too, for later checking */
1380                        oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381                        oa->o_flags |= cksum_type_pack(cksum_type);
1382                } else {
1383                        /* clear out the checksum flag, in case this is a
1384                         * resend but cl_checksum is no longer set. b=11238 */
1385                        oa->o_valid &= ~OBD_MD_FLCKSUM;
1386                }
1387                oa->o_cksum = body->oa.o_cksum;
1388                /* 1 RC per niobuf */
1389                req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390                                     sizeof(__u32) * niocount);
1391        } else {
1392                if (cli->cl_checksum &&
1393                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395                                body->oa.o_flags = 0;
1396                        body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397                        body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                }
1399        }
1400        ptlrpc_request_set_replen(req);
1401
1402        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403        aa = ptlrpc_req_async_args(req);
1404        aa->aa_oa = oa;
1405        aa->aa_requested_nob = requested_nob;
1406        aa->aa_nio_count = niocount;
1407        aa->aa_page_count = page_count;
1408        aa->aa_resends = 0;
1409        aa->aa_ppga = pga;
1410        aa->aa_cli = cli;
1411        INIT_LIST_HEAD(&aa->aa_oaps);
1412        if (ocapa && reserve)
1413                aa->aa_ocapa = capa_get(ocapa);
1414
1415        *reqp = req;
1416        return 0;
1417
1418 out:
1419        ptlrpc_req_finished(req);
1420        return rc;
1421}
1422
1423static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424                                __u32 client_cksum, __u32 server_cksum, int nob,
1425                                obd_count page_count, struct brw_page **pga,
1426                                cksum_type_t client_cksum_type)
1427{
1428        __u32 new_cksum;
1429        char *msg;
1430        cksum_type_t cksum_type;
1431
1432        if (server_cksum == client_cksum) {
1433                CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434                return 0;
1435        }
1436
1437        cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438                                       oa->o_flags : 0);
1439        new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440                                      cksum_type);
1441
1442        if (cksum_type != client_cksum_type)
1443                msg = "the server did not use the checksum type specified in "
1444                      "the original request - likely a protocol problem";
1445        else if (new_cksum == server_cksum)
1446                msg = "changed on the client after we checksummed it - "
1447                      "likely false positive due to mmap IO (bug 11742)";
1448        else if (new_cksum == client_cksum)
1449                msg = "changed in transit before arrival at OST";
1450        else
1451                msg = "changed in transit AND doesn't match the original - "
1452                      "likely false positive due to mmap IO (bug 11742)";
1453
1454        LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455                           " object "DOSTID" extent [%llu-%llu]\n",
1456                           msg, libcfs_nid2str(peer->nid),
1457                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460                           POSTID(&oa->o_oi), pga[0]->off,
1461                           pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462        CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463               "client csum now %x\n", client_cksum, client_cksum_type,
1464               server_cksum, cksum_type, new_cksum);
1465        return 1;
1466}
1467
1468/* Note rc enters this function as number of bytes transferred */
1469static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470{
1471        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472        const lnet_process_id_t *peer =
1473                        &req->rq_import->imp_connection->c_peer;
1474        struct client_obd *cli = aa->aa_cli;
1475        struct ost_body *body;
1476        __u32 client_cksum = 0;
1477
1478        if (rc < 0 && rc != -EDQUOT) {
1479                DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1480                return rc;
1481        }
1482
1483        LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1485        if (body == NULL) {
1486                DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1487                return -EPROTO;
1488        }
1489
1490        /* set/clear over quota flag for a uid/gid */
1491        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493                unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495                CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1496                       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497                       body->oa.o_flags);
1498                osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1499        }
1500
1501        osc_update_grant(cli, body);
1502
1503        if (rc < 0)
1504                return rc;
1505
1506        if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                if (rc > 0) {
1511                        CERROR("Unexpected +ve rc %d\n", rc);
1512                        return -EPROTO;
1513                }
1514                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                        return -EAGAIN;
1518
1519                if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                    check_write_checksum(&body->oa, peer, client_cksum,
1521                                         body->oa.o_cksum, aa->aa_requested_nob,
1522                                         aa->aa_page_count, aa->aa_ppga,
1523                                         cksum_type_unpack(aa->aa_oa->o_flags)))
1524                        return -EAGAIN;
1525
1526                rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527                                     aa->aa_page_count, aa->aa_ppga);
1528                GOTO(out, rc);
1529        }
1530
1531        /* The rest of this function executes only for OST_READs */
1532
1533        /* if unwrap_bulk failed, return -EAGAIN to retry */
1534        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535        if (rc < 0)
1536                GOTO(out, rc = -EAGAIN);
1537
1538        if (rc > aa->aa_requested_nob) {
1539                CERROR("Unexpected rc %d (%d requested)\n", rc,
1540                       aa->aa_requested_nob);
1541                return -EPROTO;
1542        }
1543
1544        if (rc != req->rq_bulk->bd_nob_transferred) {
1545                CERROR ("Unexpected rc %d (%d transferred)\n",
1546                        rc, req->rq_bulk->bd_nob_transferred);
1547                return (-EPROTO);
1548        }
1549
1550        if (rc < aa->aa_requested_nob)
1551                handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554                static int cksum_counter;
1555                __u32      server_cksum = body->oa.o_cksum;
1556                char      *via;
1557                char      *router;
1558                cksum_type_t cksum_type;
1559
1560                cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1561                                               body->oa.o_flags : 0);
1562                client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563                                                 aa->aa_ppga, OST_READ,
1564                                                 cksum_type);
1565
1566                if (peer->nid == req->rq_bulk->bd_sender) {
1567                        via = router = "";
1568                } else {
1569                        via = " via ";
1570                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
1571                }
1572
1573                if (server_cksum != client_cksum) {
1574                        LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1575                                           "%s%s%s inode "DFID" object "DOSTID
1576                                           " extent [%llu-%llu]\n",
1577                                           req->rq_import->imp_obd->obd_name,
1578                                           libcfs_nid2str(peer->nid),
1579                                           via, router,
1580                                           body->oa.o_valid & OBD_MD_FLFID ?
1581                                                body->oa.o_parent_seq : (__u64)0,
1582                                           body->oa.o_valid & OBD_MD_FLFID ?
1583                                                body->oa.o_parent_oid : 0,
1584                                           body->oa.o_valid & OBD_MD_FLFID ?
1585                                                body->oa.o_parent_ver : 0,
1586                                           POSTID(&body->oa.o_oi),
1587                                           aa->aa_ppga[0]->off,
1588                                           aa->aa_ppga[aa->aa_page_count-1]->off +
1589                                           aa->aa_ppga[aa->aa_page_count-1]->count -
1590                                                                        1);
1591                        CERROR("client %x, server %x, cksum_type %x\n",
1592                               client_cksum, server_cksum, cksum_type);
1593                        cksum_counter = 0;
1594                        aa->aa_oa->o_cksum = client_cksum;
1595                        rc = -EAGAIN;
1596                } else {
1597                        cksum_counter++;
1598                        CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1599                        rc = 0;
1600                }
1601        } else if (unlikely(client_cksum)) {
1602                static int cksum_missed;
1603
1604                cksum_missed++;
1605                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1606                        CERROR("Checksum %u requested from %s but not sent\n",
1607                               cksum_missed, libcfs_nid2str(peer->nid));
1608        } else {
1609                rc = 0;
1610        }
1611out:
1612        if (rc >= 0)
1613                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1614                                     aa->aa_oa, &body->oa);
1615
1616        return rc;
1617}
1618
1619static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1620                            struct lov_stripe_md *lsm,
1621                            obd_count page_count, struct brw_page **pga,
1622                            struct obd_capa *ocapa)
1623{
1624        struct ptlrpc_request *req;
1625        int                 rc;
1626        wait_queue_head_t           waitq;
1627        int                 generation, resends = 0;
1628        struct l_wait_info     lwi;
1629
1630        init_waitqueue_head(&waitq);
1631        generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1632
1633restart_bulk:
1634        rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1635                                  page_count, pga, &req, ocapa, 0, resends);
1636        if (rc != 0)
1637                return (rc);
1638
1639        if (resends) {
1640                req->rq_generation_set = 1;
1641                req->rq_import_generation = generation;
1642                req->rq_sent = get_seconds() + resends;
1643        }
1644
1645        rc = ptlrpc_queue_wait(req);
1646
1647        if (rc == -ETIMEDOUT && req->rq_resend) {
1648                DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1649                ptlrpc_req_finished(req);
1650                goto restart_bulk;
1651        }
1652
1653        rc = osc_brw_fini_request(req, rc);
1654
1655        ptlrpc_req_finished(req);
1656        /* When server return -EINPROGRESS, client should always retry
1657         * regardless of the number of times the bulk was resent already.*/
1658        if (osc_recoverable_error(rc)) {
1659                resends++;
1660                if (rc != -EINPROGRESS &&
1661                    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1662                        CERROR("%s: too many resend retries for object: "
1663                               ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1664                               POSTID(&oa->o_oi), rc);
1665                        goto out;
1666                }
1667                if (generation !=
1668                    exp->exp_obd->u.cli.cl_import->imp_generation) {
1669                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
1670                               ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1671                               POSTID(&oa->o_oi), rc);
1672                        goto out;
1673                }
1674
1675                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1676                                       NULL);
1677                l_wait_event(waitq, 0, &lwi);
1678
1679                goto restart_bulk;
1680        }
1681out:
1682        if (rc == -EAGAIN || rc == -EINPROGRESS)
1683                rc = -EIO;
1684        return rc;
1685}
1686
1687static int osc_brw_redo_request(struct ptlrpc_request *request,
1688                                struct osc_brw_async_args *aa, int rc)
1689{
1690        struct ptlrpc_request *new_req;
1691        struct osc_brw_async_args *new_aa;
1692        struct osc_async_page *oap;
1693
1694        DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1695                  "redo for recoverable error %d", rc);
1696
1697        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1698                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1699                                  aa->aa_cli, aa->aa_oa,
1700                                  NULL /* lsm unused by osc currently */,
1701                                  aa->aa_page_count, aa->aa_ppga,
1702                                  &new_req, aa->aa_ocapa, 0, 1);
1703        if (rc)
1704                return rc;
1705
1706        list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1707                if (oap->oap_request != NULL) {
1708                        LASSERTF(request == oap->oap_request,
1709                                 "request %p != oap_request %p\n",
1710                                 request, oap->oap_request);
1711                        if (oap->oap_interrupted) {
1712                                ptlrpc_req_finished(new_req);
1713                                return -EINTR;
1714                        }
1715                }
1716        }
1717        /* New request takes over pga and oaps from old request.
1718         * Note that copying a list_head doesn't work, need to move it... */
1719        aa->aa_resends++;
1720        new_req->rq_interpret_reply = request->rq_interpret_reply;
1721        new_req->rq_async_args = request->rq_async_args;
1722        /* cap resend delay to the current request timeout, this is similar to
1723         * what ptlrpc does (see after_reply()) */
1724        if (aa->aa_resends > new_req->rq_timeout)
1725                new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1726        else
1727                new_req->rq_sent = get_seconds() + aa->aa_resends;
1728        new_req->rq_generation_set = 1;
1729        new_req->rq_import_generation = request->rq_import_generation;
1730
1731        new_aa = ptlrpc_req_async_args(new_req);
1732
1733        INIT_LIST_HEAD(&new_aa->aa_oaps);
1734        list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1735        INIT_LIST_HEAD(&new_aa->aa_exts);
1736        list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1737        new_aa->aa_resends = aa->aa_resends;
1738
1739        list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1740                if (oap->oap_request) {
1741                        ptlrpc_req_finished(oap->oap_request);
1742                        oap->oap_request = ptlrpc_request_addref(new_req);
1743                }
1744        }
1745
1746        new_aa->aa_ocapa = aa->aa_ocapa;
1747        aa->aa_ocapa = NULL;
1748
1749        /* XXX: This code will run into problem if we're going to support
1750         * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1751         * and wait for all of them to be finished. We should inherit request
1752         * set from old request. */
1753        ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1754
1755        DEBUG_REQ(D_INFO, new_req, "new request");
1756        return 0;
1757}
1758
1759/*
1760 * ugh, we want disk allocation on the target to happen in offset order.  we'll
1761 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1762 * fine for our small page arrays and doesn't require allocation.  its an
1763 * insertion sort that swaps elements that are strides apart, shrinking the
1764 * stride down until its '1' and the array is sorted.
1765 */
1766static void sort_brw_pages(struct brw_page **array, int num)
1767{
1768        int stride, i, j;
1769        struct brw_page *tmp;
1770
1771        if (num == 1)
1772                return;
1773        for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1774                ;
1775
1776        do {
1777                stride /= 3;
1778                for (i = stride ; i < num ; i++) {
1779                        tmp = array[i];
1780                        j = i;
1781                        while (j >= stride && array[j - stride]->off > tmp->off) {
1782                                array[j] = array[j - stride];
1783                                j -= stride;
1784                        }
1785                        array[j] = tmp;
1786                }
1787        } while (stride > 1);
1788}
1789
1790static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1791{
1792        int count = 1;
1793        int offset;
1794        int i = 0;
1795
1796        LASSERT (pages > 0);
1797        offset = pg[i]->off & ~CFS_PAGE_MASK;
1798
1799        for (;;) {
1800                pages--;
1801                if (pages == 0)  /* that's all */
1802                        return count;
1803
1804                if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1805                        return count;   /* doesn't end on page boundary */
1806
1807                i++;
1808                offset = pg[i]->off & ~CFS_PAGE_MASK;
1809                if (offset != 0)        /* doesn't start on page boundary */
1810                        return count;
1811
1812                count++;
1813        }
1814}
1815
1816static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1817{
1818        struct brw_page **ppga;
1819        int i;
1820
1821        OBD_ALLOC(ppga, sizeof(*ppga) * count);
1822        if (ppga == NULL)
1823                return NULL;
1824
1825        for (i = 0; i < count; i++)
1826                ppga[i] = pga + i;
1827        return ppga;
1828}
1829
1830static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1831{
1832        LASSERT(ppga != NULL);
1833        OBD_FREE(ppga, sizeof(*ppga) * count);
1834}
1835
1836static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1837                   obd_count page_count, struct brw_page *pga,
1838                   struct obd_trans_info *oti)
1839{
1840        struct obdo *saved_oa = NULL;
1841        struct brw_page **ppga, **orig;
1842        struct obd_import *imp = class_exp2cliimp(exp);
1843        struct client_obd *cli;
1844        int rc, page_count_orig;
1845
1846        LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1847        cli = &imp->imp_obd->u.cli;
1848
1849        if (cmd & OBD_BRW_CHECK) {
1850                /* The caller just wants to know if there's a chance that this
1851                 * I/O can succeed */
1852
1853                if (imp->imp_invalid)
1854                        return -EIO;
1855                return 0;
1856        }
1857
1858        /* test_brw with a failed create can trip this, maybe others. */
1859        LASSERT(cli->cl_max_pages_per_rpc);
1860
1861        rc = 0;
1862
1863        orig = ppga = osc_build_ppga(pga, page_count);
1864        if (ppga == NULL)
1865                return -ENOMEM;
1866        page_count_orig = page_count;
1867
1868        sort_brw_pages(ppga, page_count);
1869        while (page_count) {
1870                obd_count pages_per_brw;
1871
1872                if (page_count > cli->cl_max_pages_per_rpc)
1873                        pages_per_brw = cli->cl_max_pages_per_rpc;
1874                else
1875                        pages_per_brw = page_count;
1876
1877                pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1878
1879                if (saved_oa != NULL) {
1880                        /* restore previously saved oa */
1881                        *oinfo->oi_oa = *saved_oa;
1882                } else if (page_count > pages_per_brw) {
1883                        /* save a copy of oa (brw will clobber it) */
1884                        OBDO_ALLOC(saved_oa);
1885                        if (saved_oa == NULL)
1886                                GOTO(out, rc = -ENOMEM);
1887                        *saved_oa = *oinfo->oi_oa;
1888                }
1889
1890                rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1891                                      pages_per_brw, ppga, oinfo->oi_capa);
1892
1893                if (rc != 0)
1894                        break;
1895
1896                page_count -= pages_per_brw;
1897                ppga += pages_per_brw;
1898        }
1899
1900out:
1901        osc_release_ppga(orig, page_count_orig);
1902
1903        if (saved_oa != NULL)
1904                OBDO_FREE(saved_oa);
1905
1906        return rc;
1907}
1908
1909static int brw_interpret(const struct lu_env *env,
1910                         struct ptlrpc_request *req, void *data, int rc)
1911{
1912        struct osc_brw_async_args *aa = data;
1913        struct osc_extent *ext;
1914        struct osc_extent *tmp;
1915        struct cl_object  *obj = NULL;
1916        struct client_obd *cli = aa->aa_cli;
1917
1918        rc = osc_brw_fini_request(req, rc);
1919        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1920        /* When server return -EINPROGRESS, client should always retry
1921         * regardless of the number of times the bulk was resent already. */
1922        if (osc_recoverable_error(rc)) {
1923                if (req->rq_import_generation !=
1924                    req->rq_import->imp_generation) {
1925                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
1926                               ""DOSTID", rc = %d.\n",
1927                               req->rq_import->imp_obd->obd_name,
1928                               POSTID(&aa->aa_oa->o_oi), rc);
1929                } else if (rc == -EINPROGRESS ||
1930                    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1931                        rc = osc_brw_redo_request(req, aa, rc);
1932                } else {
1933                        CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1934                               req->rq_import->imp_obd->obd_name,
1935                               POSTID(&aa->aa_oa->o_oi), rc);
1936                }
1937
1938                if (rc == 0)
1939                        return 0;
1940                else if (rc == -EAGAIN || rc == -EINPROGRESS)
1941                        rc = -EIO;
1942        }
1943
1944        if (aa->aa_ocapa) {
1945                capa_put(aa->aa_ocapa);
1946                aa->aa_ocapa = NULL;
1947        }
1948
1949        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1950                if (obj == NULL && rc == 0) {
1951                        obj = osc2cl(ext->oe_obj);
1952                        cl_object_get(obj);
1953                }
1954
1955                list_del_init(&ext->oe_link);
1956                osc_extent_finish(env, ext, 1, rc);
1957        }
1958        LASSERT(list_empty(&aa->aa_exts));
1959        LASSERT(list_empty(&aa->aa_oaps));
1960
1961        if (obj != NULL) {
1962                struct obdo *oa = aa->aa_oa;
1963                struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1964                unsigned long valid = 0;
1965
1966                LASSERT(rc == 0);
1967                if (oa->o_valid & OBD_MD_FLBLOCKS) {
1968                        attr->cat_blocks = oa->o_blocks;
1969                        valid |= CAT_BLOCKS;
1970                }
1971                if (oa->o_valid & OBD_MD_FLMTIME) {
1972                        attr->cat_mtime = oa->o_mtime;
1973                        valid |= CAT_MTIME;
1974                }
1975                if (oa->o_valid & OBD_MD_FLATIME) {
1976                        attr->cat_atime = oa->o_atime;
1977                        valid |= CAT_ATIME;
1978                }
1979                if (oa->o_valid & OBD_MD_FLCTIME) {
1980                        attr->cat_ctime = oa->o_ctime;
1981                        valid |= CAT_CTIME;
1982                }
1983                if (valid != 0) {
1984                        cl_object_attr_lock(obj);
1985                        cl_object_attr_set(env, obj, attr, valid);
1986                        cl_object_attr_unlock(obj);
1987                }
1988                cl_object_put(env, obj);
1989        }
1990        OBDO_FREE(aa->aa_oa);
1991
1992        cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1993                          req->rq_bulk->bd_nob_transferred);
1994        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1995        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1996
1997        client_obd_list_lock(&cli->cl_loi_list_lock);
1998        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1999         * is called so we know whether to go to sync BRWs or wait for more
2000         * RPCs to complete */
2001        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2002                cli->cl_w_in_flight--;
2003        else
2004                cli->cl_r_in_flight--;
2005        osc_wake_cache_waiters(cli);
2006        client_obd_list_unlock(&cli->cl_loi_list_lock);
2007
2008        osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2009        return rc;
2010}
2011
2012/**
2013 * Build an RPC by the list of extent @ext_list. The caller must ensure
2014 * that the total pages in this list are NOT over max pages per RPC.
2015 * Extents in the list must be in OES_RPC state.
2016 */
2017int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2018                  struct list_head *ext_list, int cmd, pdl_policy_t pol)
2019{
2020        struct ptlrpc_request           *req = NULL;
2021        struct osc_extent               *ext;
2022        struct brw_page                 **pga = NULL;
2023        struct osc_brw_async_args       *aa = NULL;
2024        struct obdo                     *oa = NULL;
2025        struct osc_async_page           *oap;
2026        struct osc_async_page           *tmp;
2027        struct cl_req                   *clerq = NULL;
2028        enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2029                                                                      CRT_READ;
2030        struct ldlm_lock                *lock = NULL;
2031        struct cl_req_attr              *crattr = NULL;
2032        obd_off                         starting_offset = OBD_OBJECT_EOF;
2033        obd_off                         ending_offset = 0;
2034        int                             mpflag = 0;
2035        int                             mem_tight = 0;
2036        int                             page_count = 0;
2037        int                             i;
2038        int                             rc;
2039        LIST_HEAD(rpc_list);
2040
2041        LASSERT(!list_empty(ext_list));
2042
2043        /* add pages into rpc_list to build BRW rpc */
2044        list_for_each_entry(ext, ext_list, oe_link) {
2045                LASSERT(ext->oe_state == OES_RPC);
2046                mem_tight |= ext->oe_memalloc;
2047                list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2048                        ++page_count;
2049                        list_add_tail(&oap->oap_rpc_item, &rpc_list);
2050                        if (starting_offset > oap->oap_obj_off)
2051                                starting_offset = oap->oap_obj_off;
2052                        else
2053                                LASSERT(oap->oap_page_off == 0);
2054                        if (ending_offset < oap->oap_obj_off + oap->oap_count)
2055                                ending_offset = oap->oap_obj_off +
2056                                                oap->oap_count;
2057                        else
2058                                LASSERT(oap->oap_page_off + oap->oap_count ==
2059                                        PAGE_CACHE_SIZE);
2060                }
2061        }
2062
2063        if (mem_tight)
2064                mpflag = cfs_memory_pressure_get_and_set();
2065
2066        OBD_ALLOC(crattr, sizeof(*crattr));
2067        if (crattr == NULL)
2068                GOTO(out, rc = -ENOMEM);
2069
2070        OBD_ALLOC(pga, sizeof(*pga) * page_count);
2071        if (pga == NULL)
2072                GOTO(out, rc = -ENOMEM);
2073
2074        OBDO_ALLOC(oa);
2075        if (oa == NULL)
2076                GOTO(out, rc = -ENOMEM);
2077
2078        i = 0;
2079        list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2080                struct cl_page *page = oap2cl_page(oap);
2081                if (clerq == NULL) {
2082                        clerq = cl_req_alloc(env, page, crt,
2083                                             1 /* only 1-object rpcs for now */);
2084                        if (IS_ERR(clerq))
2085                                GOTO(out, rc = PTR_ERR(clerq));
2086                        lock = oap->oap_ldlm_lock;
2087                }
2088                if (mem_tight)
2089                        oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2090                pga[i] = &oap->oap_brw_page;
2091                pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2092                CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2093                       pga[i]->pg, page_index(oap->oap_page), oap,
2094                       pga[i]->flag);
2095                i++;
2096                cl_req_page_add(env, clerq, page);
2097        }
2098
2099        /* always get the data for the obdo for the rpc */
2100        LASSERT(clerq != NULL);
2101        crattr->cra_oa = oa;
2102        cl_req_attr_set(env, clerq, crattr, ~0ULL);
2103        if (lock) {
2104                oa->o_handle = lock->l_remote_handle;
2105                oa->o_valid |= OBD_MD_FLHANDLE;
2106        }
2107
2108        rc = cl_req_prep(env, clerq);
2109        if (rc != 0) {
2110                CERROR("cl_req_prep failed: %d\n", rc);
2111                GOTO(out, rc);
2112        }
2113
2114        sort_brw_pages(pga, page_count);
2115        rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2116                        pga, &req, crattr->cra_capa, 1, 0);
2117        if (rc != 0) {
2118                CERROR("prep_req failed: %d\n", rc);
2119                GOTO(out, rc);
2120        }
2121
2122        req->rq_interpret_reply = brw_interpret;
2123
2124        if (mem_tight != 0)
2125                req->rq_memalloc = 1;
2126
2127        /* Need to update the timestamps after the request is built in case
2128         * we race with setattr (locally or in queue at OST).  If OST gets
2129         * later setattr before earlier BRW (as determined by the request xid),
2130         * the OST will not use BRW timestamps.  Sadly, there is no obvious
2131         * way to do this in a single call.  bug 10150 */
2132        cl_req_attr_set(env, clerq, crattr,
2133                        OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2134
2135        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2136
2137        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2138        aa = ptlrpc_req_async_args(req);
2139        INIT_LIST_HEAD(&aa->aa_oaps);
2140        list_splice_init(&rpc_list, &aa->aa_oaps);
2141        INIT_LIST_HEAD(&aa->aa_exts);
2142        list_splice_init(ext_list, &aa->aa_exts);
2143        aa->aa_clerq = clerq;
2144
2145        /* queued sync pages can be torn down while the pages
2146         * were between the pending list and the rpc */
2147        tmp = NULL;
2148        list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2149                /* only one oap gets a request reference */
2150                if (tmp == NULL)
2151                        tmp = oap;
2152                if (oap->oap_interrupted && !req->rq_intr) {
2153                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2154                                        oap, req);
2155                        ptlrpc_mark_interrupted(req);
2156                }
2157        }
2158        if (tmp != NULL)
2159                tmp->oap_request = ptlrpc_request_addref(req);
2160
2161        client_obd_list_lock(&cli->cl_loi_list_lock);
2162        starting_offset >>= PAGE_CACHE_SHIFT;
2163        if (cmd == OBD_BRW_READ) {
2164                cli->cl_r_in_flight++;
2165                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2166                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2167                lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2168                                      starting_offset + 1);
2169        } else {
2170                cli->cl_w_in_flight++;
2171                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2172                lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2173                lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2174                                      starting_offset + 1);
2175        }
2176        client_obd_list_unlock(&cli->cl_loi_list_lock);
2177
2178        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2179                  page_count, aa, cli->cl_r_in_flight,
2180                  cli->cl_w_in_flight);
2181
2182        /* XXX: Maybe the caller can check the RPC bulk descriptor to
2183         * see which CPU/NUMA node the majority of pages were allocated
2184         * on, and try to assign the async RPC to the CPU core
2185         * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2186         *
2187         * But on the other hand, we expect that multiple ptlrpcd
2188         * threads and the initial write sponsor can run in parallel,
2189         * especially when data checksum is enabled, which is CPU-bound
2190         * operation and single ptlrpcd thread cannot process in time.
2191         * So more ptlrpcd threads sharing BRW load
2192         * (with PDL_POLICY_ROUND) seems better.
2193         */
2194        ptlrpcd_add_req(req, pol, -1);
2195        rc = 0;
2196
2197out:
2198        if (mem_tight != 0)
2199                cfs_memory_pressure_restore(mpflag);
2200
2201        if (crattr != NULL) {
2202                capa_put(crattr->cra_capa);
2203                OBD_FREE(crattr, sizeof(*crattr));
2204        }
2205
2206        if (rc != 0) {
2207                LASSERT(req == NULL);
2208
2209                if (oa)
2210                        OBDO_FREE(oa);
2211                if (pga)
2212                        OBD_FREE(pga, sizeof(*pga) * page_count);
2213                /* this should happen rarely and is pretty bad, it makes the
2214                 * pending list not follow the dirty order */
2215                while (!list_empty(ext_list)) {
2216                        ext = list_entry(ext_list->next, struct osc_extent,
2217                                             oe_link);
2218                        list_del_init(&ext->oe_link);
2219                        osc_extent_finish(env, ext, 0, rc);
2220                }
2221                if (clerq && !IS_ERR(clerq))
2222                        cl_req_completion(env, clerq, rc);
2223        }
2224        return rc;
2225}
2226
2227static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2228                                        struct ldlm_enqueue_info *einfo)
2229{
2230        void *data = einfo->ei_cbdata;
2231        int set = 0;
2232
2233        LASSERT(lock != NULL);
2234        LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2235        LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2236        LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2237        LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2238
2239        lock_res_and_lock(lock);
2240        spin_lock(&osc_ast_guard);
2241
2242        if (lock->l_ast_data == NULL)
2243                lock->l_ast_data = data;
2244        if (lock->l_ast_data == data)
2245                set = 1;
2246
2247        spin_unlock(&osc_ast_guard);
2248        unlock_res_and_lock(lock);
2249
2250        return set;
2251}
2252
2253static int osc_set_data_with_check(struct lustre_handle *lockh,
2254                                   struct ldlm_enqueue_info *einfo)
2255{
2256        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2257        int set = 0;
2258
2259        if (lock != NULL) {
2260                set = osc_set_lock_data_with_check(lock, einfo);
2261                LDLM_LOCK_PUT(lock);
2262        } else
2263                CERROR("lockh %p, data %p - client evicted?\n",
2264                       lockh, einfo->ei_cbdata);
2265        return set;
2266}
2267
2268static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2269                             ldlm_iterator_t replace, void *data)
2270{
2271        struct ldlm_res_id res_id;
2272        struct obd_device *obd = class_exp2obd(exp);
2273
2274        ostid_build_res_name(&lsm->lsm_oi, &res_id);
2275        ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2276        return 0;
2277}
2278
2279/* find any ldlm lock of the inode in osc
2280 * return 0    not find
2281 *      1    find one
2282 *      < 0    error */
2283static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2284                           ldlm_iterator_t replace, void *data)
2285{
2286        struct ldlm_res_id res_id;
2287        struct obd_device *obd = class_exp2obd(exp);
2288        int rc = 0;
2289
2290        ostid_build_res_name(&lsm->lsm_oi, &res_id);
2291        rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2292        if (rc == LDLM_ITER_STOP)
2293                return(1);
2294        if (rc == LDLM_ITER_CONTINUE)
2295                return(0);
2296        return(rc);
2297}
2298
2299static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2300                            obd_enqueue_update_f upcall, void *cookie,
2301                            __u64 *flags, int agl, int rc)
2302{
2303        int intent = *flags & LDLM_FL_HAS_INTENT;
2304
2305        if (intent) {
2306                /* The request was created before ldlm_cli_enqueue call. */
2307                if (rc == ELDLM_LOCK_ABORTED) {
2308                        struct ldlm_reply *rep;
2309                        rep = req_capsule_server_get(&req->rq_pill,
2310                                                     &RMF_DLM_REP);
2311
2312                        LASSERT(rep != NULL);
2313                        rep->lock_policy_res1 =
2314                                ptlrpc_status_ntoh(rep->lock_policy_res1);
2315                        if (rep->lock_policy_res1)
2316                                rc = rep->lock_policy_res1;
2317                }
2318        }
2319
2320        if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2321            (rc == 0)) {
2322                *flags |= LDLM_FL_LVB_READY;
2323                CDEBUG(D_INODE,"got kms %llu blocks %llu mtime %llu\n",
2324                       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2325        }
2326
2327        /* Call the update callback. */
2328        rc = (*upcall)(cookie, rc);
2329        return rc;
2330}
2331
2332static int osc_enqueue_interpret(const struct lu_env *env,
2333                                 struct ptlrpc_request *req,
2334                                 struct osc_enqueue_args *aa, int rc)
2335{
2336        struct ldlm_lock *lock;
2337        struct lustre_handle handle;
2338        __u32 mode;
2339        struct ost_lvb *lvb;
2340        __u32 lvb_len;
2341        __u64 *flags = aa->oa_flags;
2342
2343        /* Make a local copy of a lock handle and a mode, because aa->oa_*
2344         * might be freed anytime after lock upcall has been called. */
2345        lustre_handle_copy(&handle, aa->oa_lockh);
2346        mode = aa->oa_ei->ei_mode;
2347
2348        /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2349         * be valid. */
2350        lock = ldlm_handle2lock(&handle);
2351
2352        /* Take an additional reference so that a blocking AST that
2353         * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2354         * to arrive after an upcall has been executed by
2355         * osc_enqueue_fini(). */
2356        ldlm_lock_addref(&handle, mode);
2357
2358        /* Let CP AST to grant the lock first. */
2359        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2360
2361        if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2362                lvb = NULL;
2363                lvb_len = 0;
2364        } else {
2365                lvb = aa->oa_lvb;
2366                lvb_len = sizeof(*aa->oa_lvb);
2367        }
2368
2369        /* Complete obtaining the lock procedure. */
2370        rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2371                                   mode, flags, lvb, lvb_len, &handle, rc);
2372        /* Complete osc stuff. */
2373        rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2374                              flags, aa->oa_agl, rc);
2375
2376        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2377
2378        /* Release the lock for async request. */
2379        if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2380                /*
2381                 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2382                 * not already released by
2383                 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2384                 */
2385                ldlm_lock_decref(&handle, mode);
2386
2387        LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2388                 aa->oa_lockh, req, aa);
2389        ldlm_lock_decref(&handle, mode);
2390        LDLM_LOCK_PUT(lock);
2391        return rc;
2392}
2393
2394void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2395                        struct lov_oinfo *loi, __u64 flags,
2396                        struct ost_lvb *lvb, __u32 mode, int rc)
2397{
2398        struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2399
2400        if (rc == ELDLM_OK) {
2401                __u64 tmp;
2402
2403                LASSERT(lock != NULL);
2404                loi->loi_lvb = *lvb;
2405                tmp = loi->loi_lvb.lvb_size;
2406                /* Extend KMS up to the end of this lock and no further
2407                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2408                if (tmp > lock->l_policy_data.l_extent.end)
2409                        tmp = lock->l_policy_data.l_extent.end + 1;
2410                if (tmp >= loi->loi_kms) {
2411                        LDLM_DEBUG(lock, "lock acquired, setting rss=%llu, kms=%llu",
2412                                   loi->loi_lvb.lvb_size, tmp);
2413                        loi_kms_set(loi, tmp);
2414                } else {
2415                        LDLM_DEBUG(lock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
2416                                   loi->loi_lvb.lvb_size, loi->loi_kms,
2417                                   lock->l_policy_data.l_extent.end);
2418                }
2419                ldlm_lock_allow_match(lock);
2420        } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2421                LASSERT(lock != NULL);
2422                loi->loi_lvb = *lvb;
2423                ldlm_lock_allow_match(lock);
2424                CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
2425                       loi->loi_lvb.lvb_size, loi->loi_kms);
2426                rc = ELDLM_OK;
2427        }
2428
2429        if (lock != NULL) {
2430                if (rc != ELDLM_OK)
2431                        ldlm_lock_fail_match(lock);
2432
2433                LDLM_LOCK_PUT(lock);
2434        }
2435}
2436EXPORT_SYMBOL(osc_update_enqueue);
2437
2438struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2439
2440/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2441 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2442 * other synchronous requests, however keeping some locks and trying to obtain
2443 * others may take a considerable amount of time in a case of ost failure; and
2444 * when other sync requests do not get released lock from a client, the client
2445 * is excluded from the cluster -- such scenarious make the life difficult, so
2446 * release locks just after they are obtained. */
2447int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2448                     __u64 *flags, ldlm_policy_data_t *policy,
2449                     struct ost_lvb *lvb, int kms_valid,
2450                     obd_enqueue_update_f upcall, void *cookie,
2451                     struct ldlm_enqueue_info *einfo,
2452                     struct lustre_handle *lockh,
2453                     struct ptlrpc_request_set *rqset, int async, int agl)
2454{
2455        struct obd_device *obd = exp->exp_obd;
2456        struct ptlrpc_request *req = NULL;
2457        int intent = *flags & LDLM_FL_HAS_INTENT;
2458        __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2459        ldlm_mode_t mode;
2460        int rc;
2461
2462        /* Filesystem lock extents are extended to page boundaries so that
2463         * dealing with the page cache is a little smoother.  */
2464        policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2465        policy->l_extent.end |= ~CFS_PAGE_MASK;
2466
2467        /*
2468         * kms is not valid when either object is completely fresh (so that no
2469         * locks are cached), or object was evicted. In the latter case cached
2470         * lock cannot be used, because it would prime inode state with
2471         * potentially stale LVB.
2472         */
2473        if (!kms_valid)
2474                goto no_match;
2475
2476        /* Next, search for already existing extent locks that will cover us */
2477        /* If we're trying to read, we also search for an existing PW lock.  The
2478         * VFS and page cache already protect us locally, so lots of readers/
2479         * writers can share a single PW lock.
2480         *
2481         * There are problems with conversion deadlocks, so instead of
2482         * converting a read lock to a write lock, we'll just enqueue a new
2483         * one.
2484         *
2485         * At some point we should cancel the read lock instead of making them
2486         * send us a blocking callback, but there are problems with canceling
2487         * locks out from other users right now, too. */
2488        mode = einfo->ei_mode;
2489        if (einfo->ei_mode == LCK_PR)
2490                mode |= LCK_PW;
2491        mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2492                               einfo->ei_type, policy, mode, lockh, 0);
2493        if (mode) {
2494                struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2495
2496                if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2497                        /* For AGL, if enqueue RPC is sent but the lock is not
2498                         * granted, then skip to process this strpe.
2499                         * Return -ECANCELED to tell the caller. */
2500                        ldlm_lock_decref(lockh, mode);
2501                        LDLM_LOCK_PUT(matched);
2502                        return -ECANCELED;
2503                } else if (osc_set_lock_data_with_check(matched, einfo)) {
2504                        *flags |= LDLM_FL_LVB_READY;
2505                        /* addref the lock only if not async requests and PW
2506                         * lock is matched whereas we asked for PR. */
2507                        if (!rqset && einfo->ei_mode != mode)
2508                                ldlm_lock_addref(lockh, LCK_PR);
2509                        if (intent) {
2510                                /* I would like to be able to ASSERT here that
2511                                 * rss <= kms, but I can't, for reasons which
2512                                 * are explained in lov_enqueue() */
2513                        }
2514
2515                        /* We already have a lock, and it's referenced.
2516                         *
2517                         * At this point, the cl_lock::cll_state is CLS_QUEUING,
2518                         * AGL upcall may change it to CLS_HELD directly. */
2519                        (*upcall)(cookie, ELDLM_OK);
2520
2521                        if (einfo->ei_mode != mode)
2522                                ldlm_lock_decref(lockh, LCK_PW);
2523                        else if (rqset)
2524                                /* For async requests, decref the lock. */
2525                                ldlm_lock_decref(lockh, einfo->ei_mode);
2526                        LDLM_LOCK_PUT(matched);
2527                        return ELDLM_OK;
2528                } else {
2529                        ldlm_lock_decref(lockh, mode);
2530                        LDLM_LOCK_PUT(matched);
2531                }
2532        }
2533
2534 no_match:
2535        if (intent) {
2536                LIST_HEAD(cancels);
2537                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2538                                           &RQF_LDLM_ENQUEUE_LVB);
2539                if (req == NULL)
2540                        return -ENOMEM;
2541
2542                rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2543                if (rc) {
2544                        ptlrpc_request_free(req);
2545                        return rc;
2546                }
2547
2548                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2549                                     sizeof(*lvb));
2550                ptlrpc_request_set_replen(req);
2551        }
2552
2553        /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2554        *flags &= ~LDLM_FL_BLOCK_GRANTED;
2555
2556        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2557                              sizeof(*lvb), LVB_T_OST, lockh, async);
2558        if (rqset) {
2559                if (!rc) {
2560                        struct osc_enqueue_args *aa;
2561                        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2562                        aa = ptlrpc_req_async_args(req);
2563                        aa->oa_ei = einfo;
2564                        aa->oa_exp = exp;
2565                        aa->oa_flags  = flags;
2566                        aa->oa_upcall = upcall;
2567                        aa->oa_cookie = cookie;
2568                        aa->oa_lvb    = lvb;
2569                        aa->oa_lockh  = lockh;
2570                        aa->oa_agl    = !!agl;
2571
2572                        req->rq_interpret_reply =
2573                                (ptlrpc_interpterer_t)osc_enqueue_interpret;
2574                        if (rqset == PTLRPCD_SET)
2575                                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2576                        else
2577                                ptlrpc_set_add_req(rqset, req);
2578                } else if (intent) {
2579                        ptlrpc_req_finished(req);
2580                }
2581                return rc;
2582        }
2583
2584        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2585        if (intent)
2586                ptlrpc_req_finished(req);
2587
2588        return rc;
2589}
2590
2591static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2592                       struct ldlm_enqueue_info *einfo,
2593                       struct ptlrpc_request_set *rqset)
2594{
2595        struct ldlm_res_id res_id;
2596        int rc;
2597
2598        ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2599        rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2600                              &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2601                              oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2602                              oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2603                              rqset, rqset != NULL, 0);
2604        return rc;
2605}
2606
2607int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2608                   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2609                   __u64 *flags, void *data, struct lustre_handle *lockh,
2610                   int unref)
2611{
2612        struct obd_device *obd = exp->exp_obd;
2613        __u64 lflags = *flags;
2614        ldlm_mode_t rc;
2615
2616        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2617                return -EIO;
2618
2619        /* Filesystem lock extents are extended to page boundaries so that
2620         * dealing with the page cache is a little smoother */
2621        policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2622        policy->l_extent.end |= ~CFS_PAGE_MASK;
2623
2624        /* Next, search for already existing extent locks that will cover us */
2625        /* If we're trying to read, we also search for an existing PW lock.  The
2626         * VFS and page cache already protect us locally, so lots of readers/
2627         * writers can share a single PW lock. */
2628        rc = mode;
2629        if (mode == LCK_PR)
2630                rc |= LCK_PW;
2631        rc = ldlm_lock_match(obd->obd_namespace, lflags,
2632                             res_id, type, policy, rc, lockh, unref);
2633        if (rc) {
2634                if (data != NULL) {
2635                        if (!osc_set_data_with_check(lockh, data)) {
2636                                if (!(lflags & LDLM_FL_TEST_LOCK))
2637                                        ldlm_lock_decref(lockh, rc);
2638                                return 0;
2639                        }
2640                }
2641                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2642                        ldlm_lock_addref(lockh, LCK_PR);
2643                        ldlm_lock_decref(lockh, LCK_PW);
2644                }
2645                return rc;
2646        }
2647        return rc;
2648}
2649
2650int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2651{
2652        if (unlikely(mode == LCK_GROUP))
2653                ldlm_lock_decref_and_cancel(lockh, mode);
2654        else
2655                ldlm_lock_decref(lockh, mode);
2656
2657        return 0;
2658}
2659
2660static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2661                      __u32 mode, struct lustre_handle *lockh)
2662{
2663        return osc_cancel_base(lockh, mode);
2664}
2665
2666static int osc_cancel_unused(struct obd_export *exp,
2667                             struct lov_stripe_md *lsm,
2668                             ldlm_cancel_flags_t flags,
2669                             void *opaque)
2670{
2671        struct obd_device *obd = class_exp2obd(exp);
2672        struct ldlm_res_id res_id, *resp = NULL;
2673
2674        if (lsm != NULL) {
2675                ostid_build_res_name(&lsm->lsm_oi, &res_id);
2676                resp = &res_id;
2677        }
2678
2679        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2680}
2681
2682static int osc_statfs_interpret(const struct lu_env *env,
2683                                struct ptlrpc_request *req,
2684                                struct osc_async_args *aa, int rc)
2685{
2686        struct obd_statfs *msfs;
2687
2688        if (rc == -EBADR)
2689                /* The request has in fact never been sent
2690                 * due to issues at a higher level (LOV).
2691                 * Exit immediately since the caller is
2692                 * aware of the problem and takes care
2693                 * of the clean up */
2694                 return rc;
2695
2696        if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2697            (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2698                GOTO(out, rc = 0);
2699
2700        if (rc != 0)
2701                GOTO(out, rc);
2702
2703        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2704        if (msfs == NULL) {
2705                GOTO(out, rc = -EPROTO);
2706        }
2707
2708        *aa->aa_oi->oi_osfs = *msfs;
2709out:
2710        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2711        return rc;
2712}
2713
2714static int osc_statfs_async(struct obd_export *exp,
2715                            struct obd_info *oinfo, __u64 max_age,
2716                            struct ptlrpc_request_set *rqset)
2717{
2718        struct obd_device     *obd = class_exp2obd(exp);
2719        struct ptlrpc_request *req;
2720        struct osc_async_args *aa;
2721        int                 rc;
2722
2723        /* We could possibly pass max_age in the request (as an absolute
2724         * timestamp or a "seconds.usec ago") so the target can avoid doing
2725         * extra calls into the filesystem if that isn't necessary (e.g.
2726         * during mount that would help a bit).  Having relative timestamps
2727         * is not so great if request processing is slow, while absolute
2728         * timestamps are not ideal because they need time synchronization. */
2729        req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2730        if (req == NULL)
2731                return -ENOMEM;
2732
2733        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2734        if (rc) {
2735                ptlrpc_request_free(req);
2736                return rc;
2737        }
2738        ptlrpc_request_set_replen(req);
2739        req->rq_request_portal = OST_CREATE_PORTAL;
2740        ptlrpc_at_set_req_timeout(req);
2741
2742        if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2743                /* procfs requests not want stat in wait for avoid deadlock */
2744                req->rq_no_resend = 1;
2745                req->rq_no_delay = 1;
2746        }
2747
2748        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2749        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2750        aa = ptlrpc_req_async_args(req);
2751        aa->aa_oi = oinfo;
2752
2753        ptlrpc_set_add_req(rqset, req);
2754        return 0;
2755}
2756
2757static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2758                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2759{
2760        struct obd_device     *obd = class_exp2obd(exp);
2761        struct obd_statfs     *msfs;
2762        struct ptlrpc_request *req;
2763        struct obd_import     *imp = NULL;
2764        int rc;
2765
2766        /*Since the request might also come from lprocfs, so we need
2767         *sync this with client_disconnect_export Bug15684*/
2768        down_read(&obd->u.cli.cl_sem);
2769        if (obd->u.cli.cl_import)
2770                imp = class_import_get(obd->u.cli.cl_import);
2771        up_read(&obd->u.cli.cl_sem);
2772        if (!imp)
2773                return -ENODEV;
2774
2775        /* We could possibly pass max_age in the request (as an absolute
2776         * timestamp or a "seconds.usec ago") so the target can avoid doing
2777         * extra calls into the filesystem if that isn't necessary (e.g.
2778         * during mount that would help a bit).  Having relative timestamps
2779         * is not so great if request processing is slow, while absolute
2780         * timestamps are not ideal because they need time synchronization. */
2781        req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2782
2783        class_import_put(imp);
2784
2785        if (req == NULL)
2786                return -ENOMEM;
2787
2788        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2789        if (rc) {
2790                ptlrpc_request_free(req);
2791                return rc;
2792        }
2793        ptlrpc_request_set_replen(req);
2794        req->rq_request_portal = OST_CREATE_PORTAL;
2795        ptlrpc_at_set_req_timeout(req);
2796
2797        if (flags & OBD_STATFS_NODELAY) {
2798                /* procfs requests not want stat in wait for avoid deadlock */
2799                req->rq_no_resend = 1;
2800                req->rq_no_delay = 1;
2801        }
2802
2803        rc = ptlrpc_queue_wait(req);
2804        if (rc)
2805                GOTO(out, rc);
2806
2807        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2808        if (msfs == NULL) {
2809                GOTO(out, rc = -EPROTO);
2810        }
2811
2812        *osfs = *msfs;
2813
2814 out:
2815        ptlrpc_req_finished(req);
2816        return rc;
2817}
2818
2819/* Retrieve object striping information.
2820 *
2821 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2822 * the maximum number of OST indices which will fit in the user buffer.
2823 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2824 */
2825static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2826{
2827        /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2828        struct lov_user_md_v3 lum, *lumk;
2829        struct lov_user_ost_data_v1 *lmm_objects;
2830        int rc = 0, lum_size;
2831
2832        if (!lsm)
2833                return -ENODATA;
2834
2835        /* we only need the header part from user space to get lmm_magic and
2836         * lmm_stripe_count, (the header part is common to v1 and v3) */
2837        lum_size = sizeof(struct lov_user_md_v1);
2838        if (copy_from_user(&lum, lump, lum_size))
2839                return -EFAULT;
2840
2841        if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2842            (lum.lmm_magic != LOV_USER_MAGIC_V3))
2843                return -EINVAL;
2844
2845        /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2846        LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2847        LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2848        LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2849
2850        /* we can use lov_mds_md_size() to compute lum_size
2851         * because lov_user_md_vX and lov_mds_md_vX have the same size */
2852        if (lum.lmm_stripe_count > 0) {
2853                lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2854                OBD_ALLOC(lumk, lum_size);
2855                if (!lumk)
2856                        return -ENOMEM;
2857
2858                if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2859                        lmm_objects =
2860                            &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2861                else
2862                        lmm_objects = &(lumk->lmm_objects[0]);
2863                lmm_objects->l_ost_oi = lsm->lsm_oi;
2864        } else {
2865                lum_size = lov_mds_md_size(0, lum.lmm_magic);
2866                lumk = &lum;
2867        }
2868
2869        lumk->lmm_oi = lsm->lsm_oi;
2870        lumk->lmm_stripe_count = 1;
2871
2872        if (copy_to_user(lump, lumk, lum_size))
2873                rc = -EFAULT;
2874
2875        if (lumk != &lum)
2876                OBD_FREE(lumk, lum_size);
2877
2878        return rc;
2879}
2880
2881
2882static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2883                         void *karg, void *uarg)
2884{
2885        struct obd_device *obd = exp->exp_obd;
2886        struct obd_ioctl_data *data = karg;
2887        int err = 0;
2888
2889        if (!try_module_get(THIS_MODULE)) {
2890                CERROR("Can't get module. Is it alive?");
2891                return -EINVAL;
2892        }
2893        switch (cmd) {
2894        case OBD_IOC_LOV_GET_CONFIG: {
2895                char *buf;
2896                struct lov_desc *desc;
2897                struct obd_uuid uuid;
2898
2899                buf = NULL;
2900                len = 0;
2901                if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2902                        GOTO(out, err = -EINVAL);
2903
2904                data = (struct obd_ioctl_data *)buf;
2905
2906                if (sizeof(*desc) > data->ioc_inllen1) {
2907                        obd_ioctl_freedata(buf, len);
2908                        GOTO(out, err = -EINVAL);
2909                }
2910
2911                if (data->ioc_inllen2 < sizeof(uuid)) {
2912                        obd_ioctl_freedata(buf, len);
2913                        GOTO(out, err = -EINVAL);
2914                }
2915
2916                desc = (struct lov_desc *)data->ioc_inlbuf1;
2917                desc->ld_tgt_count = 1;
2918                desc->ld_active_tgt_count = 1;
2919                desc->ld_default_stripe_count = 1;
2920                desc->ld_default_stripe_size = 0;
2921                desc->ld_default_stripe_offset = 0;
2922                desc->ld_pattern = 0;
2923                memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2924
2925                memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2926
2927                err = copy_to_user((void *)uarg, buf, len);
2928                if (err)
2929                        err = -EFAULT;
2930                obd_ioctl_freedata(buf, len);
2931                GOTO(out, err);
2932        }
2933        case LL_IOC_LOV_SETSTRIPE:
2934                err = obd_alloc_memmd(exp, karg);
2935                if (err > 0)
2936                        err = 0;
2937                GOTO(out, err);
2938        case LL_IOC_LOV_GETSTRIPE:
2939                err = osc_getstripe(karg, uarg);
2940                GOTO(out, err);
2941        case OBD_IOC_CLIENT_RECOVER:
2942                err = ptlrpc_recover_import(obd->u.cli.cl_import,
2943                                            data->ioc_inlbuf1, 0);
2944                if (err > 0)
2945                        err = 0;
2946                GOTO(out, err);
2947        case IOC_OSC_SET_ACTIVE:
2948                err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2949                                               data->ioc_offset);
2950                GOTO(out, err);
2951        case OBD_IOC_POLL_QUOTACHECK:
2952                err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2953                GOTO(out, err);
2954        case OBD_IOC_PING_TARGET:
2955                err = ptlrpc_obd_ping(obd);
2956                GOTO(out, err);
2957        default:
2958                CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2959                       cmd, current_comm());
2960                GOTO(out, err = -ENOTTY);
2961        }
2962out:
2963        module_put(THIS_MODULE);
2964        return err;
2965}
2966
2967static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2968                        obd_count keylen, void *key, __u32 *vallen, void *val,
2969                        struct lov_stripe_md *lsm)
2970{
2971        if (!vallen || !val)
2972                return -EFAULT;
2973
2974        if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2975                __u32 *stripe = val;
2976                *vallen = sizeof(*stripe);
2977                *stripe = 0;
2978                return 0;
2979        } else if (KEY_IS(KEY_LAST_ID)) {
2980                struct ptlrpc_request *req;
2981                obd_id          *reply;
2982                char              *tmp;
2983                int                 rc;
2984
2985                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2986                                           &RQF_OST_GET_INFO_LAST_ID);
2987                if (req == NULL)
2988                        return -ENOMEM;
2989
2990                req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2991                                     RCL_CLIENT, keylen);
2992                rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2993                if (rc) {
2994                        ptlrpc_request_free(req);
2995                        return rc;
2996                }
2997
2998                tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2999                memcpy(tmp, key, keylen);
3000
3001                req->rq_no_delay = req->rq_no_resend = 1;
3002                ptlrpc_request_set_replen(req);
3003                rc = ptlrpc_queue_wait(req);
3004                if (rc)
3005                        GOTO(out, rc);
3006
3007                reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3008                if (reply == NULL)
3009                        GOTO(out, rc = -EPROTO);
3010
3011                *((obd_id *)val) = *reply;
3012        out:
3013                ptlrpc_req_finished(req);
3014                return rc;
3015        } else if (KEY_IS(KEY_FIEMAP)) {
3016                struct ll_fiemap_info_key *fm_key =
3017                                (struct ll_fiemap_info_key *)key;
3018                struct ldlm_res_id       res_id;
3019                ldlm_policy_data_t       policy;
3020                struct lustre_handle     lockh;
3021                ldlm_mode_t              mode = 0;
3022                struct ptlrpc_request   *req;
3023                struct ll_user_fiemap   *reply;
3024                char                    *tmp;
3025                int                      rc;
3026
3027                if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3028                        goto skip_locking;
3029
3030                policy.l_extent.start = fm_key->fiemap.fm_start &
3031                                                CFS_PAGE_MASK;
3032
3033                if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3034                    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3035                        policy.l_extent.end = OBD_OBJECT_EOF;
3036                else
3037                        policy.l_extent.end = (fm_key->fiemap.fm_start +
3038                                fm_key->fiemap.fm_length +
3039                                PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3040
3041                ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3042                mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3043                                       LDLM_FL_BLOCK_GRANTED |
3044                                       LDLM_FL_LVB_READY,
3045                                       &res_id, LDLM_EXTENT, &policy,
3046                                       LCK_PR | LCK_PW, &lockh, 0);
3047                if (mode) { /* lock is cached on client */
3048                        if (mode != LCK_PR) {
3049                                ldlm_lock_addref(&lockh, LCK_PR);
3050                                ldlm_lock_decref(&lockh, LCK_PW);
3051                        }
3052                } else { /* no cached lock, needs acquire lock on server side */
3053                        fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3054                        fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3055                }
3056
3057skip_locking:
3058                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3059                                           &RQF_OST_GET_INFO_FIEMAP);
3060                if (req == NULL)
3061                        GOTO(drop_lock, rc = -ENOMEM);
3062
3063                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3064                                     RCL_CLIENT, keylen);
3065                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3066                                     RCL_CLIENT, *vallen);
3067                req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068                                     RCL_SERVER, *vallen);
3069
3070                rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3071                if (rc) {
3072                        ptlrpc_request_free(req);
3073                        GOTO(drop_lock, rc);
3074                }
3075
3076                tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3077                memcpy(tmp, key, keylen);
3078                tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3079                memcpy(tmp, val, *vallen);
3080
3081                ptlrpc_request_set_replen(req);
3082                rc = ptlrpc_queue_wait(req);
3083                if (rc)
3084                        GOTO(fini_req, rc);
3085
3086                reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087                if (reply == NULL)
3088                        GOTO(fini_req, rc = -EPROTO);
3089
3090                memcpy(val, reply, *vallen);
3091fini_req:
3092                ptlrpc_req_finished(req);
3093drop_lock:
3094                if (mode)
3095                        ldlm_lock_decref(&lockh, LCK_PR);
3096                return rc;
3097        }
3098
3099        return -EINVAL;
3100}
3101
3102static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3103                              obd_count keylen, void *key, obd_count vallen,
3104                              void *val, struct ptlrpc_request_set *set)
3105{
3106        struct ptlrpc_request *req;
3107        struct obd_device     *obd = exp->exp_obd;
3108        struct obd_import     *imp = class_exp2cliimp(exp);
3109        char              *tmp;
3110        int                 rc;
3111
3112        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114        if (KEY_IS(KEY_CHECKSUM)) {
3115                if (vallen != sizeof(int))
3116                        return -EINVAL;
3117                exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118                return 0;
3119        }
3120
3121        if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122                sptlrpc_conf_client_adapt(obd);
3123                return 0;
3124        }
3125
3126        if (KEY_IS(KEY_FLUSH_CTX)) {
3127                sptlrpc_import_flush_my_ctx(imp);
3128                return 0;
3129        }
3130
3131        if (KEY_IS(KEY_CACHE_SET)) {
3132                struct client_obd *cli = &obd->u.cli;
3133
3134                LASSERT(cli->cl_cache == NULL); /* only once */
3135                cli->cl_cache = (struct cl_client_cache *)val;
3136                atomic_inc(&cli->cl_cache->ccc_users);
3137                cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3138
3139                /* add this osc into entity list */
3140                LASSERT(list_empty(&cli->cl_lru_osc));
3141                spin_lock(&cli->cl_cache->ccc_lru_lock);
3142                list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3143                spin_unlock(&cli->cl_cache->ccc_lru_lock);
3144
3145                return 0;
3146        }
3147
3148        if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3149                struct client_obd *cli = &obd->u.cli;
3150                int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3151                int target = *(int *)val;
3152
3153                nr = osc_lru_shrink(cli, min(nr, target));
3154                *(int *)val -= nr;
3155                return 0;
3156        }
3157
3158        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3159                return -EINVAL;
3160
3161        /* We pass all other commands directly to OST. Since nobody calls osc
3162           methods directly and everybody is supposed to go through LOV, we
3163           assume lov checked invalid values for us.
3164           The only recognised values so far are evict_by_nid and mds_conn.
3165           Even if something bad goes through, we'd get a -EINVAL from OST
3166           anyway. */
3167
3168        req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3169                                                &RQF_OST_SET_GRANT_INFO :
3170                                                &RQF_OBD_SET_INFO);
3171        if (req == NULL)
3172                return -ENOMEM;
3173
3174        req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175                             RCL_CLIENT, keylen);
3176        if (!KEY_IS(KEY_GRANT_SHRINK))
3177                req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3178                                     RCL_CLIENT, vallen);
3179        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3180        if (rc) {
3181                ptlrpc_request_free(req);
3182                return rc;
3183        }
3184
3185        tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3186        memcpy(tmp, key, keylen);
3187        tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3188                                                        &RMF_OST_BODY :
3189                                                        &RMF_SETINFO_VAL);
3190        memcpy(tmp, val, vallen);
3191
3192        if (KEY_IS(KEY_GRANT_SHRINK)) {
3193                struct osc_grant_args *aa;
3194                struct obdo *oa;
3195
3196                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3197                aa = ptlrpc_req_async_args(req);
3198                OBDO_ALLOC(oa);
3199                if (!oa) {
3200                        ptlrpc_req_finished(req);
3201                        return -ENOMEM;
3202                }
3203                *oa = ((struct ost_body *)val)->oa;
3204                aa->aa_oa = oa;
3205                req->rq_interpret_reply = osc_shrink_grant_interpret;
3206        }
3207
3208        ptlrpc_request_set_replen(req);
3209        if (!KEY_IS(KEY_GRANT_SHRINK)) {
3210                LASSERT(set != NULL);
3211                ptlrpc_set_add_req(set, req);
3212                ptlrpc_check_set(NULL, set);
3213        } else
3214                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3215
3216        return 0;
3217}
3218
3219
3220static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3221                         struct obd_device *disk_obd, int *index)
3222{
3223        /* this code is not supposed to be used with LOD/OSP
3224         * to be removed soon */
3225        LBUG();
3226        return 0;
3227}
3228
3229static int osc_llog_finish(struct obd_device *obd, int count)
3230{
3231        struct llog_ctxt *ctxt;
3232
3233        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3234        if (ctxt) {
3235                llog_cat_close(NULL, ctxt->loc_handle);
3236                llog_cleanup(NULL, ctxt);
3237        }
3238
3239        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3240        if (ctxt)
3241                llog_cleanup(NULL, ctxt);
3242        return 0;
3243}
3244
3245static int osc_reconnect(const struct lu_env *env,
3246                         struct obd_export *exp, struct obd_device *obd,
3247                         struct obd_uuid *cluuid,
3248                         struct obd_connect_data *data,
3249                         void *localdata)
3250{
3251        struct client_obd *cli = &obd->u.cli;
3252
3253        if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3254                long lost_grant;
3255
3256                client_obd_list_lock(&cli->cl_loi_list_lock);
3257                data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3258                                2 * cli_brw_size(obd);
3259                lost_grant = cli->cl_lost_grant;
3260                cli->cl_lost_grant = 0;
3261                client_obd_list_unlock(&cli->cl_loi_list_lock);
3262
3263                CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3264                       " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3265                       data->ocd_version, data->ocd_grant, lost_grant);
3266        }
3267
3268        return 0;
3269}
3270
3271static int osc_disconnect(struct obd_export *exp)
3272{
3273        struct obd_device *obd = class_exp2obd(exp);
3274        struct llog_ctxt  *ctxt;
3275        int rc;
3276
3277        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3278        if (ctxt) {
3279                if (obd->u.cli.cl_conn_count == 1) {
3280                        /* Flush any remaining cancel messages out to the
3281                         * target */
3282                        llog_sync(ctxt, exp, 0);
3283                }
3284                llog_ctxt_put(ctxt);
3285        } else {
3286                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3287                       obd);
3288        }
3289
3290        rc = client_disconnect_export(exp);
3291        /**
3292         * Initially we put del_shrink_grant before disconnect_export, but it
3293         * causes the following problem if setup (connect) and cleanup
3294         * (disconnect) are tangled together.
3295         *      connect p1                   disconnect p2
3296         *   ptlrpc_connect_import
3297         *     ...............         class_manual_cleanup
3298         *                                   osc_disconnect
3299         *                                   del_shrink_grant
3300         *   ptlrpc_connect_interrupt
3301         *     init_grant_shrink
3302         *   add this client to shrink list
3303         *                                    cleanup_osc
3304         * Bang! pinger trigger the shrink.
3305         * So the osc should be disconnected from the shrink list, after we
3306         * are sure the import has been destroyed. BUG18662
3307         */
3308        if (obd->u.cli.cl_import == NULL)
3309                osc_del_shrink_grant(&obd->u.cli);
3310        return rc;
3311}
3312
3313static int osc_import_event(struct obd_device *obd,
3314                            struct obd_import *imp,
3315                            enum obd_import_event event)
3316{
3317        struct client_obd *cli;
3318        int rc = 0;
3319
3320        LASSERT(imp->imp_obd == obd);
3321
3322        switch (event) {
3323        case IMP_EVENT_DISCON: {
3324                cli = &obd->u.cli;
3325                client_obd_list_lock(&cli->cl_loi_list_lock);
3326                cli->cl_avail_grant = 0;
3327                cli->cl_lost_grant = 0;
3328                client_obd_list_unlock(&cli->cl_loi_list_lock);
3329                break;
3330        }
3331        case IMP_EVENT_INACTIVE: {
3332                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3333                break;
3334        }
3335        case IMP_EVENT_INVALIDATE: {
3336                struct ldlm_namespace *ns = obd->obd_namespace;
3337                struct lu_env    *env;
3338                int                 refcheck;
3339
3340                env = cl_env_get(&refcheck);
3341                if (!IS_ERR(env)) {
3342                        /* Reset grants */
3343                        cli = &obd->u.cli;
3344                        /* all pages go to failing rpcs due to the invalid
3345                         * import */
3346                        osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3347
3348                        ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3349                        cl_env_put(env, &refcheck);
3350                } else
3351                        rc = PTR_ERR(env);
3352                break;
3353        }
3354        case IMP_EVENT_ACTIVE: {
3355                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3356                break;
3357        }
3358        case IMP_EVENT_OCD: {
3359                struct obd_connect_data *ocd = &imp->imp_connect_data;
3360
3361                if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3362                        osc_init_grant(&obd->u.cli, ocd);
3363
3364                /* See bug 7198 */
3365                if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3366                        imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3367
3368                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3369                break;
3370        }
3371        case IMP_EVENT_DEACTIVATE: {
3372                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3373                break;
3374        }
3375        case IMP_EVENT_ACTIVATE: {
3376                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3377                break;
3378        }
3379        default:
3380                CERROR("Unknown import event %d\n", event);
3381                LBUG();
3382        }
3383        return rc;
3384}
3385
3386/**
3387 * Determine whether the lock can be canceled before replaying the lock
3388 * during recovery, see bug16774 for detailed information.
3389 *
3390 * \retval zero the lock can't be canceled
3391 * \retval other ok to cancel
3392 */
3393static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3394{
3395        check_res_locked(lock->l_resource);
3396
3397        /*
3398         * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3399         *
3400         * XXX as a future improvement, we can also cancel unused write lock
3401         * if it doesn't have dirty data and active mmaps.
3402         */
3403        if (lock->l_resource->lr_type == LDLM_EXTENT &&
3404            (lock->l_granted_mode == LCK_PR ||
3405             lock->l_granted_mode == LCK_CR) &&
3406            (osc_dlm_lock_pageref(lock) == 0))
3407                return 1;
3408
3409        return 0;
3410}
3411
3412static int brw_queue_work(const struct lu_env *env, void *data)
3413{
3414        struct client_obd *cli = data;
3415
3416        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3417
3418        osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3419        return 0;
3420}
3421
3422int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3423{
3424        struct lprocfs_static_vars lvars = { NULL };
3425        struct client_obd         *cli = &obd->u.cli;
3426        void                   *handler;
3427        int                     rc;
3428
3429        rc = ptlrpcd_addref();
3430        if (rc)
3431                return rc;
3432
3433        rc = client_obd_setup(obd, lcfg);
3434        if (rc)
3435                GOTO(out_ptlrpcd, rc);
3436
3437        handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3438        if (IS_ERR(handler))
3439                GOTO(out_client_setup, rc = PTR_ERR(handler));
3440        cli->cl_writeback_work = handler;
3441
3442        rc = osc_quota_setup(obd);
3443        if (rc)
3444                GOTO(out_ptlrpcd_work, rc);
3445
3446        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3447        lprocfs_osc_init_vars(&lvars);
3448        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3449                lproc_osc_attach_seqstat(obd);
3450                sptlrpc_lprocfs_cliobd_attach(obd);
3451                ptlrpc_lprocfs_register_obd(obd);
3452        }
3453
3454        /* We need to allocate a few requests more, because
3455         * brw_interpret tries to create new requests before freeing
3456         * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3457         * reserved, but I'm afraid that might be too much wasted RAM
3458         * in fact, so 2 is just my guess and still should work. */
3459        cli->cl_import->imp_rq_pool =
3460                ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3461                                    OST_MAXREQSIZE,
3462                                    ptlrpc_add_rqs_to_pool);
3463
3464        INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3465        ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3466        return rc;
3467
3468out_ptlrpcd_work:
3469        ptlrpcd_destroy_work(handler);
3470out_client_setup:
3471        client_obd_cleanup(obd);
3472out_ptlrpcd:
3473        ptlrpcd_decref();
3474        return rc;
3475}
3476
3477static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3478{
3479        int rc = 0;
3480
3481        switch (stage) {
3482        case OBD_CLEANUP_EARLY: {
3483                struct obd_import *imp;
3484                imp = obd->u.cli.cl_import;
3485                CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3486                /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3487                ptlrpc_deactivate_import(imp);
3488                spin_lock(&imp->imp_lock);
3489                imp->imp_pingable = 0;
3490                spin_unlock(&imp->imp_lock);
3491                break;
3492        }
3493        case OBD_CLEANUP_EXPORTS: {
3494                struct client_obd *cli = &obd->u.cli;
3495                /* LU-464
3496                 * for echo client, export may be on zombie list, wait for
3497                 * zombie thread to cull it, because cli.cl_import will be
3498                 * cleared in client_disconnect_export():
3499                 *   class_export_destroy() -> obd_cleanup() ->
3500                 *   echo_device_free() -> echo_client_cleanup() ->
3501                 *   obd_disconnect() -> osc_disconnect() ->
3502                 *   client_disconnect_export()
3503                 */
3504                obd_zombie_barrier();
3505                if (cli->cl_writeback_work) {
3506                        ptlrpcd_destroy_work(cli->cl_writeback_work);
3507                        cli->cl_writeback_work = NULL;
3508                }
3509                obd_cleanup_client_import(obd);
3510                ptlrpc_lprocfs_unregister_obd(obd);
3511                lprocfs_obd_cleanup(obd);
3512                rc = obd_llog_finish(obd, 0);
3513                if (rc != 0)
3514                        CERROR("failed to cleanup llogging subsystems\n");
3515                break;
3516                }
3517        }
3518        return rc;
3519}
3520
3521int osc_cleanup(struct obd_device *obd)
3522{
3523        struct client_obd *cli = &obd->u.cli;
3524        int rc;
3525
3526        /* lru cleanup */
3527        if (cli->cl_cache != NULL) {
3528                LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3529                spin_lock(&cli->cl_cache->ccc_lru_lock);
3530                list_del_init(&cli->cl_lru_osc);
3531                spin_unlock(&cli->cl_cache->ccc_lru_lock);
3532                cli->cl_lru_left = NULL;
3533                atomic_dec(&cli->cl_cache->ccc_users);
3534                cli->cl_cache = NULL;
3535        }
3536
3537        /* free memory of osc quota cache */
3538        osc_quota_cleanup(obd);
3539
3540        rc = client_obd_cleanup(obd);
3541
3542        ptlrpcd_decref();
3543        return rc;
3544}
3545
3546int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3547{
3548        struct lprocfs_static_vars lvars = { NULL };
3549        int rc = 0;
3550
3551        lprocfs_osc_init_vars(&lvars);
3552
3553        switch (lcfg->lcfg_command) {
3554        default:
3555                rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3556                                              lcfg, obd);
3557                if (rc > 0)
3558                        rc = 0;
3559                break;
3560        }
3561
3562        return(rc);
3563}
3564
3565static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3566{
3567        return osc_process_config_base(obd, buf);
3568}
3569
3570struct obd_ops osc_obd_ops = {
3571        .o_owner                = THIS_MODULE,
3572        .o_setup                = osc_setup,
3573        .o_precleanup      = osc_precleanup,
3574        .o_cleanup            = osc_cleanup,
3575        .o_add_conn          = client_import_add_conn,
3576        .o_del_conn          = client_import_del_conn,
3577        .o_connect            = client_connect_import,
3578        .o_reconnect        = osc_reconnect,
3579        .o_disconnect      = osc_disconnect,
3580        .o_statfs              = osc_statfs,
3581        .o_statfs_async  = osc_statfs_async,
3582        .o_packmd              = osc_packmd,
3583        .o_unpackmd          = osc_unpackmd,
3584        .o_create              = osc_create,
3585        .o_destroy            = osc_destroy,
3586        .o_getattr            = osc_getattr,
3587        .o_getattr_async        = osc_getattr_async,
3588        .o_setattr            = osc_setattr,
3589        .o_setattr_async        = osc_setattr_async,
3590        .o_brw            = osc_brw,
3591        .o_punch                = osc_punch,
3592        .o_sync          = osc_sync,
3593        .o_enqueue            = osc_enqueue,
3594        .o_change_cbdata        = osc_change_cbdata,
3595        .o_find_cbdata    = osc_find_cbdata,
3596        .o_cancel              = osc_cancel,
3597        .o_cancel_unused        = osc_cancel_unused,
3598        .o_iocontrol        = osc_iocontrol,
3599        .o_get_info          = osc_get_info,
3600        .o_set_info_async       = osc_set_info_async,
3601        .o_import_event  = osc_import_event,
3602        .o_llog_init        = osc_llog_init,
3603        .o_llog_finish    = osc_llog_finish,
3604        .o_process_config       = osc_process_config,
3605        .o_quotactl          = osc_quotactl,
3606        .o_quotacheck      = osc_quotacheck,
3607};
3608
3609extern struct lu_kmem_descr osc_caches[];
3610extern spinlock_t osc_ast_guard;
3611extern struct lock_class_key osc_ast_guard_class;
3612
3613int __init osc_init(void)
3614{
3615        struct lprocfs_static_vars lvars = { NULL };
3616        int rc;
3617
3618        /* print an address of _any_ initialized kernel symbol from this
3619         * module, to allow debugging with gdb that doesn't support data
3620         * symbols from modules.*/
3621        CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3622
3623        rc = lu_kmem_init(osc_caches);
3624        if (rc)
3625                return rc;
3626
3627        lprocfs_osc_init_vars(&lvars);
3628
3629        rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3630                                 LUSTRE_OSC_NAME, &osc_device_type);
3631        if (rc) {
3632                lu_kmem_fini(osc_caches);
3633                return rc;
3634        }
3635
3636        spin_lock_init(&osc_ast_guard);
3637        lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3638
3639        return rc;
3640}
3641
3642static void /*__exit*/ osc_exit(void)
3643{
3644        class_unregister_type(LUSTRE_OSC_NAME);
3645        lu_kmem_fini(osc_caches);
3646}
3647
3648MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3649MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3650MODULE_LICENSE("GPL");
3651MODULE_VERSION(LUSTRE_VERSION_STRING);
3652
3653module_init(osc_init);
3654module_exit(osc_exit);
3655