linux/drivers/staging/lustre/lustre/lov/lov_log.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/lov/lov_log.c
  37 *
  38 * Author: Phil Schwan <phil@clusterfs.com>
  39 * Author: Peter Braam <braam@clusterfs.com>
  40 * Author: Mike Shaver <shaver@clusterfs.com>
  41 */
  42
  43#define DEBUG_SUBSYSTEM S_LOV
  44#include <linux/libcfs/libcfs.h>
  45
  46#include <obd_support.h>
  47#include <lustre_lib.h>
  48#include <lustre_net.h>
  49#include <lustre/lustre_idl.h>
  50#include <lustre_dlm.h>
  51#include <lustre_mds.h>
  52#include <obd_class.h>
  53#include <obd_lov.h>
  54#include <obd_ost.h>
  55#include <lprocfs_status.h>
  56#include <lustre_log.h>
  57
  58#include "lov_internal.h"
  59
  60/* Add log records for each OSC that this object is striped over, and return
  61 * cookies for each one.  We _would_ have nice abstraction here, except that
  62 * we need to keep cookies in stripe order, even if some are NULL, so that
  63 * the right cookies are passed back to the right OSTs at the client side.
  64 * Unset cookies should be all-zero (which will never occur naturally). */
  65static int lov_llog_origin_add(const struct lu_env *env,
  66                               struct llog_ctxt *ctxt,
  67                               struct llog_rec_hdr *rec,
  68                               struct lov_stripe_md *lsm,
  69                               struct llog_cookie *logcookies, int numcookies)
  70{
  71        struct obd_device *obd = ctxt->loc_obd;
  72        struct lov_obd *lov = &obd->u.lov;
  73        int i, rc = 0, cookies = 0;
  74        ENTRY;
  75
  76        LASSERTF(logcookies && numcookies >= lsm->lsm_stripe_count,
  77                 "logcookies %p, numcookies %d lsm->lsm_stripe_count %d \n",
  78                 logcookies, numcookies, lsm->lsm_stripe_count);
  79
  80        for (i = 0; i < lsm->lsm_stripe_count; i++) {
  81                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
  82                struct obd_device *child =
  83                        lov->lov_tgts[loi->loi_ost_idx]->ltd_exp->exp_obd;
  84                struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
  85
  86                /* fill mds unlink/setattr log record */
  87                switch (rec->lrh_type) {
  88                case MDS_UNLINK_REC: {
  89                        struct llog_unlink_rec *lur = (struct llog_unlink_rec *)rec;
  90                        lur->lur_oid = ostid_id(&loi->loi_oi);
  91                        lur->lur_oseq = (__u32)ostid_seq(&loi->loi_oi);
  92                        break;
  93                }
  94                case MDS_SETATTR64_REC: {
  95                        struct llog_setattr64_rec *lsr = (struct llog_setattr64_rec *)rec;
  96                        lsr->lsr_oi = loi->loi_oi;
  97                        break;
  98                }
  99                default:
 100                        break;
 101                }
 102
 103                /* inject error in llog_obd_add() below */
 104                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FAIL_LOV_LOG_ADD)) {
 105                        llog_ctxt_put(cctxt);
 106                        cctxt = NULL;
 107                }
 108                rc = llog_obd_add(env, cctxt, rec, NULL, logcookies + cookies,
 109                                  numcookies - cookies);
 110                llog_ctxt_put(cctxt);
 111                if (rc < 0) {
 112                        CERROR("Can't add llog (rc = %d) for stripe %d\n",
 113                               rc, cookies);
 114                        memset(logcookies + cookies, 0,
 115                               sizeof(struct llog_cookie));
 116                        rc = 1; /* skip this cookie */
 117                }
 118                /* Note that rc is always 1 if llog_obd_add was successful */
 119                cookies += rc;
 120        }
 121        RETURN(cookies);
 122}
 123
 124static int lov_llog_origin_connect(struct llog_ctxt *ctxt,
 125                                   struct llog_logid *logid,
 126                                   struct llog_gen *gen,
 127                                   struct obd_uuid *uuid)
 128{
 129        struct obd_device *obd = ctxt->loc_obd;
 130        struct lov_obd *lov = &obd->u.lov;
 131        int i, rc = 0, err = 0;
 132        ENTRY;
 133
 134        obd_getref(obd);
 135        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 136                struct obd_device *child;
 137                struct llog_ctxt *cctxt;
 138
 139                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
 140                        continue;
 141                if (uuid && !obd_uuid_equals(uuid, &lov->lov_tgts[i]->ltd_uuid))
 142                        continue;
 143                CDEBUG(D_CONFIG, "connect %d/%d\n", i, lov->desc.ld_tgt_count);
 144                child = lov->lov_tgts[i]->ltd_exp->exp_obd;
 145                cctxt = llog_get_context(child, ctxt->loc_idx);
 146                rc = llog_connect(cctxt, logid, gen, uuid);
 147                llog_ctxt_put(cctxt);
 148
 149                if (rc) {
 150                        CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
 151                        if (!err)
 152                                err = rc;
 153                }
 154        }
 155        obd_putref(obd);
 156
 157        RETURN(err);
 158}
 159
 160/* the replicators commit callback */
 161static int lov_llog_repl_cancel(const struct lu_env *env,
 162                                struct llog_ctxt *ctxt,
 163                                struct lov_stripe_md *lsm,
 164                                int count, struct llog_cookie *cookies,
 165                                int flags)
 166{
 167        struct lov_obd *lov;
 168        struct obd_device *obd = ctxt->loc_obd;
 169        int rc = 0, i;
 170        ENTRY;
 171
 172        LASSERT(lsm != NULL);
 173        LASSERT(count == lsm->lsm_stripe_count);
 174
 175        lov = &obd->u.lov;
 176        obd_getref(obd);
 177        for (i = 0; i < count; i++, cookies++) {
 178                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
 179                struct obd_device *child =
 180                        lov->lov_tgts[loi->loi_ost_idx]->ltd_exp->exp_obd;
 181                struct llog_ctxt *cctxt =
 182                        llog_get_context(child, ctxt->loc_idx);
 183                int err;
 184
 185                err = llog_cancel(env, cctxt, NULL, 1, cookies, flags);
 186                llog_ctxt_put(cctxt);
 187                if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
 188                        CERROR("%s: objid "DOSTID" subobj "DOSTID
 189                               " on OST idx %d: rc = %d\n",
 190                               obd->obd_name, POSTID(&lsm->lsm_oi),
 191                               POSTID(&loi->loi_oi), loi->loi_ost_idx, err);
 192                        if (!rc)
 193                                rc = err;
 194                }
 195        }
 196        obd_putref(obd);
 197        RETURN(rc);
 198}
 199
 200static struct llog_operations lov_mds_ost_orig_logops = {
 201        .lop_obd_add    = lov_llog_origin_add,
 202        .lop_connect    = lov_llog_origin_connect,
 203};
 204
 205static struct llog_operations lov_size_repl_logops = {
 206        .lop_cancel     = lov_llog_repl_cancel,
 207};
 208
 209int lov_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
 210                  struct obd_device *disk_obd, int *index)
 211{
 212        struct lov_obd *lov = &obd->u.lov;
 213        struct obd_device *child;
 214        int i, rc = 0;
 215        ENTRY;
 216
 217        LASSERT(olg == &obd->obd_olg);
 218        rc = llog_setup(NULL, obd, olg, LLOG_MDS_OST_ORIG_CTXT, disk_obd,
 219                        &lov_mds_ost_orig_logops);
 220        if (rc)
 221                RETURN(rc);
 222
 223        rc = llog_setup(NULL, obd, olg, LLOG_SIZE_REPL_CTXT, disk_obd,
 224                        &lov_size_repl_logops);
 225        if (rc)
 226                GOTO(err_cleanup, rc);
 227
 228        obd_getref(obd);
 229        /* count may not match lov->desc.ld_tgt_count during dynamic ost add */
 230        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 231                if (!lov->lov_tgts[i])
 232                        continue;
 233
 234                if (index && i != *index)
 235                        continue;
 236
 237                child = lov->lov_tgts[i]->ltd_obd;
 238                rc = obd_llog_init(child, &child->obd_olg, disk_obd, &i);
 239                if (rc)
 240                        CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
 241                               "(rc=%d)\n", i, child->obd_name,
 242                               disk_obd->obd_name, rc);
 243                rc = 0;
 244        }
 245        obd_putref(obd);
 246        GOTO(err_cleanup, rc);
 247err_cleanup:
 248        if (rc) {
 249                struct llog_ctxt *ctxt =
 250                        llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
 251                if (ctxt)
 252                        llog_cleanup(NULL, ctxt);
 253                ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
 254                if (ctxt)
 255                        llog_cleanup(NULL, ctxt);
 256        }
 257        return rc;
 258}
 259
 260int lov_llog_finish(struct obd_device *obd, int count)
 261{
 262        struct llog_ctxt *ctxt;
 263
 264        ENTRY;
 265
 266        /* cleanup our llogs only if the ctxts have been setup
 267         * (client lov doesn't setup, mds lov does). */
 268        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
 269        if (ctxt)
 270                llog_cleanup(NULL, ctxt);
 271
 272        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
 273        if (ctxt)
 274                llog_cleanup(NULL, ctxt);
 275
 276        /* lov->tgt llogs are cleaned during osc_cleanup. */
 277        RETURN(0);
 278}
 279