linux/drivers/staging/lustre/lustre/obdclass/llog.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/obdclass/llog.c
  37 *
  38 * OST<->MDS recovery logging infrastructure.
  39 * Invariants in implementation:
  40 * - we do not share logs among different OST<->MDS connections, so that
  41 *   if an OST or MDS fails it need only look at log(s) relevant to itself
  42 *
  43 * Author: Andreas Dilger <adilger@clusterfs.com>
  44 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
  45 * Author: Mikhail Pershin <tappro@whamcloud.com>
  46 */
  47
  48#define DEBUG_SUBSYSTEM S_LOG
  49
  50#include "../include/obd_class.h"
  51#include "../include/lustre_log.h"
  52#include "llog_internal.h"
  53
  54/*
  55 * Allocate a new log or catalog handle
  56 * Used inside llog_open().
  57 */
  58static struct llog_handle *llog_alloc_handle(void)
  59{
  60        struct llog_handle *loghandle;
  61
  62        loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
  63        if (!loghandle)
  64                return NULL;
  65
  66        init_rwsem(&loghandle->lgh_lock);
  67        spin_lock_init(&loghandle->lgh_hdr_lock);
  68        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
  69        atomic_set(&loghandle->lgh_refcount, 1);
  70
  71        return loghandle;
  72}
  73
  74/*
  75 * Free llog handle and header data if exists. Used in llog_close() only
  76 */
  77static void llog_free_handle(struct llog_handle *loghandle)
  78{
  79        /* failed llog_init_handle */
  80        if (!loghandle->lgh_hdr)
  81                goto out;
  82
  83        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
  84                LASSERT(list_empty(&loghandle->u.phd.phd_entry));
  85        else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
  86                LASSERT(list_empty(&loghandle->u.chd.chd_head));
  87        LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
  88        kfree(loghandle->lgh_hdr);
  89out:
  90        kfree(loghandle);
  91}
  92
  93void llog_handle_get(struct llog_handle *loghandle)
  94{
  95        atomic_inc(&loghandle->lgh_refcount);
  96}
  97
  98void llog_handle_put(struct llog_handle *loghandle)
  99{
 100        LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
 101        if (atomic_dec_and_test(&loghandle->lgh_refcount))
 102                llog_free_handle(loghandle);
 103}
 104
 105static int llog_read_header(const struct lu_env *env,
 106                            struct llog_handle *handle,
 107                            struct obd_uuid *uuid)
 108{
 109        struct llog_operations *lop;
 110        int rc;
 111
 112        rc = llog_handle2ops(handle, &lop);
 113        if (rc)
 114                return rc;
 115
 116        if (!lop->lop_read_header)
 117                return -EOPNOTSUPP;
 118
 119        rc = lop->lop_read_header(env, handle);
 120        if (rc == LLOG_EEMPTY) {
 121                struct llog_log_hdr *llh = handle->lgh_hdr;
 122
 123                handle->lgh_last_idx = 0; /* header is record with index 0 */
 124                llh->llh_count = 1;      /* for the header record */
 125                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
 126                llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
 127                llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
 128                llh->llh_timestamp = ktime_get_real_seconds();
 129                if (uuid)
 130                        memcpy(&llh->llh_tgtuuid, uuid,
 131                               sizeof(llh->llh_tgtuuid));
 132                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
 133                ext2_set_bit(0, llh->llh_bitmap);
 134                rc = 0;
 135        }
 136        return rc;
 137}
 138
 139int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
 140                     int flags, struct obd_uuid *uuid)
 141{
 142        struct llog_log_hdr     *llh;
 143        int                      rc;
 144
 145        LASSERT(!handle->lgh_hdr);
 146
 147        llh = kzalloc(sizeof(*llh), GFP_NOFS);
 148        if (!llh)
 149                return -ENOMEM;
 150        handle->lgh_hdr = llh;
 151        /* first assign flags to use llog_client_ops */
 152        llh->llh_flags = flags;
 153        rc = llog_read_header(env, handle, uuid);
 154        if (rc == 0) {
 155                if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
 156                              flags & LLOG_F_IS_CAT) ||
 157                             (llh->llh_flags & LLOG_F_IS_CAT &&
 158                              flags & LLOG_F_IS_PLAIN))) {
 159                        CERROR("%s: llog type is %s but initializing %s\n",
 160                               handle->lgh_ctxt->loc_obd->obd_name,
 161                               llh->llh_flags & LLOG_F_IS_CAT ?
 162                               "catalog" : "plain",
 163                               flags & LLOG_F_IS_CAT ? "catalog" : "plain");
 164                        rc = -EINVAL;
 165                        goto out;
 166                } else if (llh->llh_flags &
 167                           (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
 168                        /*
 169                         * it is possible to open llog without specifying llog
 170                         * type so it is taken from llh_flags
 171                         */
 172                        flags = llh->llh_flags;
 173                } else {
 174                        /* for some reason the llh_flags has no type set */
 175                        CERROR("llog type is not specified!\n");
 176                        rc = -EINVAL;
 177                        goto out;
 178                }
 179                if (unlikely(uuid &&
 180                             !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
 181                        CERROR("%s: llog uuid mismatch: %s/%s\n",
 182                               handle->lgh_ctxt->loc_obd->obd_name,
 183                               (char *)uuid->uuid,
 184                               (char *)llh->llh_tgtuuid.uuid);
 185                        rc = -EEXIST;
 186                        goto out;
 187                }
 188        }
 189        if (flags & LLOG_F_IS_CAT) {
 190                LASSERT(list_empty(&handle->u.chd.chd_head));
 191                INIT_LIST_HEAD(&handle->u.chd.chd_head);
 192                llh->llh_size = sizeof(struct llog_logid_rec);
 193        } else if (!(flags & LLOG_F_IS_PLAIN)) {
 194                CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
 195                       handle->lgh_ctxt->loc_obd->obd_name,
 196                       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
 197                rc = -EINVAL;
 198        }
 199out:
 200        if (rc) {
 201                kfree(llh);
 202                handle->lgh_hdr = NULL;
 203        }
 204        return rc;
 205}
 206EXPORT_SYMBOL(llog_init_handle);
 207
 208static int llog_process_thread(void *arg)
 209{
 210        struct llog_process_info        *lpi = arg;
 211        struct llog_handle              *loghandle = lpi->lpi_loghandle;
 212        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
 213        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
 214        char                            *buf;
 215        __u64                            cur_offset = LLOG_CHUNK_SIZE;
 216        __u64                            last_offset;
 217        int                              rc = 0, index = 1, last_index;
 218        int                              saved_index = 0;
 219        int                              last_called_index = 0;
 220
 221        LASSERT(llh);
 222
 223        buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
 224        if (!buf) {
 225                lpi->lpi_rc = -ENOMEM;
 226                return 0;
 227        }
 228
 229        if (cd) {
 230                last_called_index = cd->lpcd_first_idx;
 231                index = cd->lpcd_first_idx + 1;
 232        }
 233        if (cd && cd->lpcd_last_idx)
 234                last_index = cd->lpcd_last_idx;
 235        else
 236                last_index = LLOG_BITMAP_BYTES * 8 - 1;
 237
 238        while (rc == 0) {
 239                struct llog_rec_hdr *rec;
 240
 241                /* skip records not set in bitmap */
 242                while (index <= last_index &&
 243                       !ext2_test_bit(index, llh->llh_bitmap))
 244                        ++index;
 245
 246                LASSERT(index <= last_index + 1);
 247                if (index == last_index + 1)
 248                        break;
 249repeat:
 250                CDEBUG(D_OTHER, "index: %d last_index %d\n",
 251                       index, last_index);
 252
 253                /* get the buf with our target record; avoid old garbage */
 254                memset(buf, 0, LLOG_CHUNK_SIZE);
 255                last_offset = cur_offset;
 256                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
 257                                     index, &cur_offset, buf, LLOG_CHUNK_SIZE);
 258                if (rc)
 259                        goto out;
 260
 261                /* NB: when rec->lrh_len is accessed it is already swabbed
 262                 * since it is used at the "end" of the loop and the rec
 263                 * swabbing is done at the beginning of the loop.
 264                 */
 265                for (rec = (struct llog_rec_hdr *)buf;
 266                     (char *)rec < buf + LLOG_CHUNK_SIZE;
 267                     rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)) {
 268
 269                        CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
 270                               rec, rec->lrh_type);
 271
 272                        if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
 273                                lustre_swab_llog_rec(rec);
 274
 275                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
 276                               rec->lrh_type, rec->lrh_index);
 277
 278                        if (rec->lrh_index == 0) {
 279                                /* probably another rec just got added? */
 280                                rc = 0;
 281                                if (index <= loghandle->lgh_last_idx)
 282                                        goto repeat;
 283                                goto out; /* no more records */
 284                        }
 285                        if (rec->lrh_len == 0 ||
 286                            rec->lrh_len > LLOG_CHUNK_SIZE) {
 287                                CWARN("invalid length %d in llog record for index %d/%d\n",
 288                                      rec->lrh_len,
 289                                      rec->lrh_index, index);
 290                                rc = -EINVAL;
 291                                goto out;
 292                        }
 293
 294                        if (rec->lrh_index < index) {
 295                                CDEBUG(D_OTHER, "skipping lrh_index %d\n",
 296                                       rec->lrh_index);
 297                                continue;
 298                        }
 299
 300                        CDEBUG(D_OTHER,
 301                               "lrh_index: %d lrh_len: %d (%d remains)\n",
 302                               rec->lrh_index, rec->lrh_len,
 303                               (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
 304
 305                        loghandle->lgh_cur_idx = rec->lrh_index;
 306                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
 307                                                    last_offset;
 308
 309                        /* if set, process the callback on this record */
 310                        if (ext2_test_bit(index, llh->llh_bitmap)) {
 311                                rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
 312                                                 lpi->lpi_cbdata);
 313                                last_called_index = index;
 314                                if (rc)
 315                                        goto out;
 316                        } else {
 317                                CDEBUG(D_OTHER, "Skipped index %d\n", index);
 318                        }
 319
 320                        /* next record, still in buffer? */
 321                        ++index;
 322                        if (index > last_index) {
 323                                rc = 0;
 324                                goto out;
 325                        }
 326                }
 327        }
 328
 329out:
 330        if (cd)
 331                cd->lpcd_last_idx = last_called_index;
 332
 333        kfree(buf);
 334        lpi->lpi_rc = rc;
 335        return 0;
 336}
 337
 338static int llog_process_thread_daemonize(void *arg)
 339{
 340        struct llog_process_info        *lpi = arg;
 341        struct lu_env                    env;
 342        int                              rc;
 343
 344        unshare_fs_struct();
 345
 346        /* client env has no keys, tags is just 0 */
 347        rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
 348        if (rc)
 349                goto out;
 350        lpi->lpi_env = &env;
 351
 352        rc = llog_process_thread(arg);
 353
 354        lu_env_fini(&env);
 355out:
 356        complete(&lpi->lpi_completion);
 357        return rc;
 358}
 359
 360int llog_process_or_fork(const struct lu_env *env,
 361                         struct llog_handle *loghandle,
 362                         llog_cb_t cb, void *data, void *catdata, bool fork)
 363{
 364        struct llog_process_info *lpi;
 365        int                   rc;
 366
 367        lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
 368        if (!lpi)
 369                return -ENOMEM;
 370        lpi->lpi_loghandle = loghandle;
 371        lpi->lpi_cb     = cb;
 372        lpi->lpi_cbdata    = data;
 373        lpi->lpi_catdata   = catdata;
 374
 375        if (fork) {
 376                struct task_struct *task;
 377
 378                /* The new thread can't use parent env,
 379                 * init the new one in llog_process_thread_daemonize.
 380                 */
 381                lpi->lpi_env = NULL;
 382                init_completion(&lpi->lpi_completion);
 383                task = kthread_run(llog_process_thread_daemonize, lpi,
 384                                   "llog_process_thread");
 385                if (IS_ERR(task)) {
 386                        rc = PTR_ERR(task);
 387                        CERROR("%s: cannot start thread: rc = %d\n",
 388                               loghandle->lgh_ctxt->loc_obd->obd_name, rc);
 389                        goto out_lpi;
 390                }
 391                wait_for_completion(&lpi->lpi_completion);
 392        } else {
 393                lpi->lpi_env = env;
 394                llog_process_thread(lpi);
 395        }
 396        rc = lpi->lpi_rc;
 397out_lpi:
 398        kfree(lpi);
 399        return rc;
 400}
 401EXPORT_SYMBOL(llog_process_or_fork);
 402
 403int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
 404                 llog_cb_t cb, void *data, void *catdata)
 405{
 406        return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
 407}
 408EXPORT_SYMBOL(llog_process);
 409
 410int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
 411              struct llog_handle **lgh, struct llog_logid *logid,
 412              char *name, enum llog_open_param open_param)
 413{
 414        int      raised;
 415        int      rc;
 416
 417        LASSERT(ctxt);
 418        LASSERT(ctxt->loc_logops);
 419
 420        if (!ctxt->loc_logops->lop_open) {
 421                *lgh = NULL;
 422                return -EOPNOTSUPP;
 423        }
 424
 425        *lgh = llog_alloc_handle();
 426        if (!*lgh)
 427                return -ENOMEM;
 428        (*lgh)->lgh_ctxt = ctxt;
 429        (*lgh)->lgh_logops = ctxt->loc_logops;
 430
 431        raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
 432        if (!raised)
 433                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
 434        rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
 435        if (!raised)
 436                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 437        if (rc) {
 438                llog_free_handle(*lgh);
 439                *lgh = NULL;
 440        }
 441        return rc;
 442}
 443EXPORT_SYMBOL(llog_open);
 444
 445int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
 446{
 447        struct llog_operations  *lop;
 448        int                      rc;
 449
 450        rc = llog_handle2ops(loghandle, &lop);
 451        if (rc)
 452                goto out;
 453        if (!lop->lop_close) {
 454                rc = -EOPNOTSUPP;
 455                goto out;
 456        }
 457        rc = lop->lop_close(env, loghandle);
 458out:
 459        llog_handle_put(loghandle);
 460        return rc;
 461}
 462EXPORT_SYMBOL(llog_close);
 463