linux/fs/xfs/xfs_log_cil.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
   4 */
   5
   6#include "xfs.h"
   7#include "xfs_fs.h"
   8#include "xfs_format.h"
   9#include "xfs_log_format.h"
  10#include "xfs_shared.h"
  11#include "xfs_trans_resv.h"
  12#include "xfs_mount.h"
  13#include "xfs_extent_busy.h"
  14#include "xfs_trans.h"
  15#include "xfs_trans_priv.h"
  16#include "xfs_log.h"
  17#include "xfs_log_priv.h"
  18#include "xfs_trace.h"
  19
  20struct workqueue_struct *xfs_discard_wq;
  21
  22/*
  23 * Allocate a new ticket. Failing to get a new ticket makes it really hard to
  24 * recover, so we don't allow failure here. Also, we allocate in a context that
  25 * we don't want to be issuing transactions from, so we need to tell the
  26 * allocation code this as well.
  27 *
  28 * We don't reserve any space for the ticket - we are going to steal whatever
  29 * space we require from transactions as they commit. To ensure we reserve all
  30 * the space required, we need to set the current reservation of the ticket to
  31 * zero so that we know to steal the initial transaction overhead from the
  32 * first transaction commit.
  33 */
  34static struct xlog_ticket *
  35xlog_cil_ticket_alloc(
  36        struct xlog     *log)
  37{
  38        struct xlog_ticket *tic;
  39
  40        tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0,
  41                                KM_NOFS);
  42
  43        /*
  44         * set the current reservation to zero so we know to steal the basic
  45         * transaction overhead reservation from the first transaction commit.
  46         */
  47        tic->t_curr_res = 0;
  48        return tic;
  49}
  50
  51/*
  52 * After the first stage of log recovery is done, we know where the head and
  53 * tail of the log are. We need this log initialisation done before we can
  54 * initialise the first CIL checkpoint context.
  55 *
  56 * Here we allocate a log ticket to track space usage during a CIL push.  This
  57 * ticket is passed to xlog_write() directly so that we don't slowly leak log
  58 * space by failing to account for space used by log headers and additional
  59 * region headers for split regions.
  60 */
  61void
  62xlog_cil_init_post_recovery(
  63        struct xlog     *log)
  64{
  65        log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
  66        log->l_cilp->xc_ctx->sequence = 1;
  67}
  68
  69static inline int
  70xlog_cil_iovec_space(
  71        uint    niovecs)
  72{
  73        return round_up((sizeof(struct xfs_log_vec) +
  74                                        niovecs * sizeof(struct xfs_log_iovec)),
  75                        sizeof(uint64_t));
  76}
  77
  78/*
  79 * Allocate or pin log vector buffers for CIL insertion.
  80 *
  81 * The CIL currently uses disposable buffers for copying a snapshot of the
  82 * modified items into the log during a push. The biggest problem with this is
  83 * the requirement to allocate the disposable buffer during the commit if:
  84 *      a) does not exist; or
  85 *      b) it is too small
  86 *
  87 * If we do this allocation within xlog_cil_insert_format_items(), it is done
  88 * under the xc_ctx_lock, which means that a CIL push cannot occur during
  89 * the memory allocation. This means that we have a potential deadlock situation
  90 * under low memory conditions when we have lots of dirty metadata pinned in
  91 * the CIL and we need a CIL commit to occur to free memory.
  92 *
  93 * To avoid this, we need to move the memory allocation outside the
  94 * xc_ctx_lock, but because the log vector buffers are disposable, that opens
  95 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log
  96 * vector buffers between the check and the formatting of the item into the
  97 * log vector buffer within the xc_ctx_lock.
  98 *
  99 * Because the log vector buffer needs to be unchanged during the CIL push
 100 * process, we cannot share the buffer between the transaction commit (which
 101 * modifies the buffer) and the CIL push context that is writing the changes
 102 * into the log. This means skipping preallocation of buffer space is
 103 * unreliable, but we most definitely do not want to be allocating and freeing
 104 * buffers unnecessarily during commits when overwrites can be done safely.
 105 *
 106 * The simplest solution to this problem is to allocate a shadow buffer when a
 107 * log item is committed for the second time, and then to only use this buffer
 108 * if necessary. The buffer can remain attached to the log item until such time
 109 * it is needed, and this is the buffer that is reallocated to match the size of
 110 * the incoming modification. Then during the formatting of the item we can swap
 111 * the active buffer with the new one if we can't reuse the existing buffer. We
 112 * don't free the old buffer as it may be reused on the next modification if
 113 * it's size is right, otherwise we'll free and reallocate it at that point.
 114 *
 115 * This function builds a vector for the changes in each log item in the
 116 * transaction. It then works out the length of the buffer needed for each log
 117 * item, allocates them and attaches the vector to the log item in preparation
 118 * for the formatting step which occurs under the xc_ctx_lock.
 119 *
 120 * While this means the memory footprint goes up, it avoids the repeated
 121 * alloc/free pattern that repeated modifications of an item would otherwise
 122 * cause, and hence minimises the CPU overhead of such behaviour.
 123 */
 124static void
 125xlog_cil_alloc_shadow_bufs(
 126        struct xlog             *log,
 127        struct xfs_trans        *tp)
 128{
 129        struct xfs_log_item     *lip;
 130
 131        list_for_each_entry(lip, &tp->t_items, li_trans) {
 132                struct xfs_log_vec *lv;
 133                int     niovecs = 0;
 134                int     nbytes = 0;
 135                int     buf_size;
 136                bool    ordered = false;
 137
 138                /* Skip items which aren't dirty in this transaction. */
 139                if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
 140                        continue;
 141
 142                /* get number of vecs and size of data to be stored */
 143                lip->li_ops->iop_size(lip, &niovecs, &nbytes);
 144
 145                /*
 146                 * Ordered items need to be tracked but we do not wish to write
 147                 * them. We need a logvec to track the object, but we do not
 148                 * need an iovec or buffer to be allocated for copying data.
 149                 */
 150                if (niovecs == XFS_LOG_VEC_ORDERED) {
 151                        ordered = true;
 152                        niovecs = 0;
 153                        nbytes = 0;
 154                }
 155
 156                /*
 157                 * We 64-bit align the length of each iovec so that the start
 158                 * of the next one is naturally aligned.  We'll need to
 159                 * account for that slack space here. Then round nbytes up
 160                 * to 64-bit alignment so that the initial buffer alignment is
 161                 * easy to calculate and verify.
 162                 */
 163                nbytes += niovecs * sizeof(uint64_t);
 164                nbytes = round_up(nbytes, sizeof(uint64_t));
 165
 166                /*
 167                 * The data buffer needs to start 64-bit aligned, so round up
 168                 * that space to ensure we can align it appropriately and not
 169                 * overrun the buffer.
 170                 */
 171                buf_size = nbytes + xlog_cil_iovec_space(niovecs);
 172
 173                /*
 174                 * if we have no shadow buffer, or it is too small, we need to
 175                 * reallocate it.
 176                 */
 177                if (!lip->li_lv_shadow ||
 178                    buf_size > lip->li_lv_shadow->lv_size) {
 179
 180                        /*
 181                         * We free and allocate here as a realloc would copy
 182                         * unnecessary data. We don't use kmem_zalloc() for the
 183                         * same reason - we don't need to zero the data area in
 184                         * the buffer, only the log vector header and the iovec
 185                         * storage.
 186                         */
 187                        kmem_free(lip->li_lv_shadow);
 188
 189                        lv = kmem_alloc_large(buf_size, KM_NOFS);
 190                        memset(lv, 0, xlog_cil_iovec_space(niovecs));
 191
 192                        lv->lv_item = lip;
 193                        lv->lv_size = buf_size;
 194                        if (ordered)
 195                                lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
 196                        else
 197                                lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1];
 198                        lip->li_lv_shadow = lv;
 199                } else {
 200                        /* same or smaller, optimise common overwrite case */
 201                        lv = lip->li_lv_shadow;
 202                        if (ordered)
 203                                lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
 204                        else
 205                                lv->lv_buf_len = 0;
 206                        lv->lv_bytes = 0;
 207                        lv->lv_next = NULL;
 208                }
 209
 210                /* Ensure the lv is set up according to ->iop_size */
 211                lv->lv_niovecs = niovecs;
 212
 213                /* The allocated data region lies beyond the iovec region */
 214                lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs);
 215        }
 216
 217}
 218
 219/*
 220 * Prepare the log item for insertion into the CIL. Calculate the difference in
 221 * log space and vectors it will consume, and if it is a new item pin it as
 222 * well.
 223 */
 224STATIC void
 225xfs_cil_prepare_item(
 226        struct xlog             *log,
 227        struct xfs_log_vec      *lv,
 228        struct xfs_log_vec      *old_lv,
 229        int                     *diff_len,
 230        int                     *diff_iovecs)
 231{
 232        /* Account for the new LV being passed in */
 233        if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) {
 234                *diff_len += lv->lv_bytes;
 235                *diff_iovecs += lv->lv_niovecs;
 236        }
 237
 238        /*
 239         * If there is no old LV, this is the first time we've seen the item in
 240         * this CIL context and so we need to pin it. If we are replacing the
 241         * old_lv, then remove the space it accounts for and make it the shadow
 242         * buffer for later freeing. In both cases we are now switching to the
 243         * shadow buffer, so update the the pointer to it appropriately.
 244         */
 245        if (!old_lv) {
 246                if (lv->lv_item->li_ops->iop_pin)
 247                        lv->lv_item->li_ops->iop_pin(lv->lv_item);
 248                lv->lv_item->li_lv_shadow = NULL;
 249        } else if (old_lv != lv) {
 250                ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED);
 251
 252                *diff_len -= old_lv->lv_bytes;
 253                *diff_iovecs -= old_lv->lv_niovecs;
 254                lv->lv_item->li_lv_shadow = old_lv;
 255        }
 256
 257        /* attach new log vector to log item */
 258        lv->lv_item->li_lv = lv;
 259
 260        /*
 261         * If this is the first time the item is being committed to the
 262         * CIL, store the sequence number on the log item so we can
 263         * tell in future commits whether this is the first checkpoint
 264         * the item is being committed into.
 265         */
 266        if (!lv->lv_item->li_seq)
 267                lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence;
 268}
 269
 270/*
 271 * Format log item into a flat buffers
 272 *
 273 * For delayed logging, we need to hold a formatted buffer containing all the
 274 * changes on the log item. This enables us to relog the item in memory and
 275 * write it out asynchronously without needing to relock the object that was
 276 * modified at the time it gets written into the iclog.
 277 *
 278 * This function takes the prepared log vectors attached to each log item, and
 279 * formats the changes into the log vector buffer. The buffer it uses is
 280 * dependent on the current state of the vector in the CIL - the shadow lv is
 281 * guaranteed to be large enough for the current modification, but we will only
 282 * use that if we can't reuse the existing lv. If we can't reuse the existing
 283 * lv, then simple swap it out for the shadow lv. We don't free it - that is
 284 * done lazily either by th enext modification or the freeing of the log item.
 285 *
 286 * We don't set up region headers during this process; we simply copy the
 287 * regions into the flat buffer. We can do this because we still have to do a
 288 * formatting step to write the regions into the iclog buffer.  Writing the
 289 * ophdrs during the iclog write means that we can support splitting large
 290 * regions across iclog boundares without needing a change in the format of the
 291 * item/region encapsulation.
 292 *
 293 * Hence what we need to do now is change the rewrite the vector array to point
 294 * to the copied region inside the buffer we just allocated. This allows us to
 295 * format the regions into the iclog as though they are being formatted
 296 * directly out of the objects themselves.
 297 */
 298static void
 299xlog_cil_insert_format_items(
 300        struct xlog             *log,
 301        struct xfs_trans        *tp,
 302        int                     *diff_len,
 303        int                     *diff_iovecs)
 304{
 305        struct xfs_log_item     *lip;
 306
 307
 308        /* Bail out if we didn't find a log item.  */
 309        if (list_empty(&tp->t_items)) {
 310                ASSERT(0);
 311                return;
 312        }
 313
 314        list_for_each_entry(lip, &tp->t_items, li_trans) {
 315                struct xfs_log_vec *lv;
 316                struct xfs_log_vec *old_lv = NULL;
 317                struct xfs_log_vec *shadow;
 318                bool    ordered = false;
 319
 320                /* Skip items which aren't dirty in this transaction. */
 321                if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
 322                        continue;
 323
 324                /*
 325                 * The formatting size information is already attached to
 326                 * the shadow lv on the log item.
 327                 */
 328                shadow = lip->li_lv_shadow;
 329                if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED)
 330                        ordered = true;
 331
 332                /* Skip items that do not have any vectors for writing */
 333                if (!shadow->lv_niovecs && !ordered)
 334                        continue;
 335
 336                /* compare to existing item size */
 337                old_lv = lip->li_lv;
 338                if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
 339                        /* same or smaller, optimise common overwrite case */
 340                        lv = lip->li_lv;
 341                        lv->lv_next = NULL;
 342
 343                        if (ordered)
 344                                goto insert;
 345
 346                        /*
 347                         * set the item up as though it is a new insertion so
 348                         * that the space reservation accounting is correct.
 349                         */
 350                        *diff_iovecs -= lv->lv_niovecs;
 351                        *diff_len -= lv->lv_bytes;
 352
 353                        /* Ensure the lv is set up according to ->iop_size */
 354                        lv->lv_niovecs = shadow->lv_niovecs;
 355
 356                        /* reset the lv buffer information for new formatting */
 357                        lv->lv_buf_len = 0;
 358                        lv->lv_bytes = 0;
 359                        lv->lv_buf = (char *)lv +
 360                                        xlog_cil_iovec_space(lv->lv_niovecs);
 361                } else {
 362                        /* switch to shadow buffer! */
 363                        lv = shadow;
 364                        lv->lv_item = lip;
 365                        if (ordered) {
 366                                /* track as an ordered logvec */
 367                                ASSERT(lip->li_lv == NULL);
 368                                goto insert;
 369                        }
 370                }
 371
 372                ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t)));
 373                lip->li_ops->iop_format(lip, lv);
 374insert:
 375                xfs_cil_prepare_item(log, lv, old_lv, diff_len, diff_iovecs);
 376        }
 377}
 378
 379/*
 380 * Insert the log items into the CIL and calculate the difference in space
 381 * consumed by the item. Add the space to the checkpoint ticket and calculate
 382 * if the change requires additional log metadata. If it does, take that space
 383 * as well. Remove the amount of space we added to the checkpoint ticket from
 384 * the current transaction ticket so that the accounting works out correctly.
 385 */
 386static void
 387xlog_cil_insert_items(
 388        struct xlog             *log,
 389        struct xfs_trans        *tp)
 390{
 391        struct xfs_cil          *cil = log->l_cilp;
 392        struct xfs_cil_ctx      *ctx = cil->xc_ctx;
 393        struct xfs_log_item     *lip;
 394        int                     len = 0;
 395        int                     diff_iovecs = 0;
 396        int                     iclog_space;
 397        int                     iovhdr_res = 0, split_res = 0, ctx_res = 0;
 398
 399        ASSERT(tp);
 400
 401        /*
 402         * We can do this safely because the context can't checkpoint until we
 403         * are done so it doesn't matter exactly how we update the CIL.
 404         */
 405        xlog_cil_insert_format_items(log, tp, &len, &diff_iovecs);
 406
 407        spin_lock(&cil->xc_cil_lock);
 408
 409        /* account for space used by new iovec headers  */
 410        iovhdr_res = diff_iovecs * sizeof(xlog_op_header_t);
 411        len += iovhdr_res;
 412        ctx->nvecs += diff_iovecs;
 413
 414        /* attach the transaction to the CIL if it has any busy extents */
 415        if (!list_empty(&tp->t_busy))
 416                list_splice_init(&tp->t_busy, &ctx->busy_extents);
 417
 418        /*
 419         * Now transfer enough transaction reservation to the context ticket
 420         * for the checkpoint. The context ticket is special - the unit
 421         * reservation has to grow as well as the current reservation as we
 422         * steal from tickets so we can correctly determine the space used
 423         * during the transaction commit.
 424         */
 425        if (ctx->ticket->t_curr_res == 0) {
 426                ctx_res = ctx->ticket->t_unit_res;
 427                ctx->ticket->t_curr_res = ctx_res;
 428                tp->t_ticket->t_curr_res -= ctx_res;
 429        }
 430
 431        /* do we need space for more log record headers? */
 432        iclog_space = log->l_iclog_size - log->l_iclog_hsize;
 433        if (len > 0 && (ctx->space_used / iclog_space !=
 434                                (ctx->space_used + len) / iclog_space)) {
 435                split_res = (len + iclog_space - 1) / iclog_space;
 436                /* need to take into account split region headers, too */
 437                split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
 438                ctx->ticket->t_unit_res += split_res;
 439                ctx->ticket->t_curr_res += split_res;
 440                tp->t_ticket->t_curr_res -= split_res;
 441                ASSERT(tp->t_ticket->t_curr_res >= len);
 442        }
 443        tp->t_ticket->t_curr_res -= len;
 444        ctx->space_used += len;
 445
 446        /*
 447         * If we've overrun the reservation, dump the tx details before we move
 448         * the log items. Shutdown is imminent...
 449         */
 450        if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
 451                xfs_warn(log->l_mp, "Transaction log reservation overrun:");
 452                xfs_warn(log->l_mp,
 453                         "  log items: %d bytes (iov hdrs: %d bytes)",
 454                         len, iovhdr_res);
 455                xfs_warn(log->l_mp, "  split region headers: %d bytes",
 456                         split_res);
 457                xfs_warn(log->l_mp, "  ctx ticket: %d bytes", ctx_res);
 458                xlog_print_trans(tp);
 459        }
 460
 461        /*
 462         * Now (re-)position everything modified at the tail of the CIL.
 463         * We do this here so we only need to take the CIL lock once during
 464         * the transaction commit.
 465         */
 466        list_for_each_entry(lip, &tp->t_items, li_trans) {
 467
 468                /* Skip items which aren't dirty in this transaction. */
 469                if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
 470                        continue;
 471
 472                /*
 473                 * Only move the item if it isn't already at the tail. This is
 474                 * to prevent a transient list_empty() state when reinserting
 475                 * an item that is already the only item in the CIL.
 476                 */
 477                if (!list_is_last(&lip->li_cil, &cil->xc_cil))
 478                        list_move_tail(&lip->li_cil, &cil->xc_cil);
 479        }
 480
 481        spin_unlock(&cil->xc_cil_lock);
 482
 483        if (tp->t_ticket->t_curr_res < 0)
 484                xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
 485}
 486
 487static void
 488xlog_cil_free_logvec(
 489        struct xfs_log_vec      *log_vector)
 490{
 491        struct xfs_log_vec      *lv;
 492
 493        for (lv = log_vector; lv; ) {
 494                struct xfs_log_vec *next = lv->lv_next;
 495                kmem_free(lv);
 496                lv = next;
 497        }
 498}
 499
 500static void
 501xlog_discard_endio_work(
 502        struct work_struct      *work)
 503{
 504        struct xfs_cil_ctx      *ctx =
 505                container_of(work, struct xfs_cil_ctx, discard_endio_work);
 506        struct xfs_mount        *mp = ctx->cil->xc_log->l_mp;
 507
 508        xfs_extent_busy_clear(mp, &ctx->busy_extents, false);
 509        kmem_free(ctx);
 510}
 511
 512/*
 513 * Queue up the actual completion to a thread to avoid IRQ-safe locking for
 514 * pagb_lock.  Note that we need a unbounded workqueue, otherwise we might
 515 * get the execution delayed up to 30 seconds for weird reasons.
 516 */
 517static void
 518xlog_discard_endio(
 519        struct bio              *bio)
 520{
 521        struct xfs_cil_ctx      *ctx = bio->bi_private;
 522
 523        INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work);
 524        queue_work(xfs_discard_wq, &ctx->discard_endio_work);
 525        bio_put(bio);
 526}
 527
 528static void
 529xlog_discard_busy_extents(
 530        struct xfs_mount        *mp,
 531        struct xfs_cil_ctx      *ctx)
 532{
 533        struct list_head        *list = &ctx->busy_extents;
 534        struct xfs_extent_busy  *busyp;
 535        struct bio              *bio = NULL;
 536        struct blk_plug         plug;
 537        int                     error = 0;
 538
 539        ASSERT(mp->m_flags & XFS_MOUNT_DISCARD);
 540
 541        blk_start_plug(&plug);
 542        list_for_each_entry(busyp, list, list) {
 543                trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
 544                                         busyp->length);
 545
 546                error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
 547                                XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
 548                                XFS_FSB_TO_BB(mp, busyp->length),
 549                                GFP_NOFS, 0, &bio);
 550                if (error && error != -EOPNOTSUPP) {
 551                        xfs_info(mp,
 552         "discard failed for extent [0x%llx,%u], error %d",
 553                                 (unsigned long long)busyp->bno,
 554                                 busyp->length,
 555                                 error);
 556                        break;
 557                }
 558        }
 559
 560        if (bio) {
 561                bio->bi_private = ctx;
 562                bio->bi_end_io = xlog_discard_endio;
 563                submit_bio(bio);
 564        } else {
 565                xlog_discard_endio_work(&ctx->discard_endio_work);
 566        }
 567        blk_finish_plug(&plug);
 568}
 569
 570/*
 571 * Mark all items committed and clear busy extents. We free the log vector
 572 * chains in a separate pass so that we unpin the log items as quickly as
 573 * possible.
 574 */
 575static void
 576xlog_cil_committed(
 577        struct xfs_cil_ctx      *ctx,
 578        bool                    abort)
 579{
 580        struct xfs_mount        *mp = ctx->cil->xc_log->l_mp;
 581
 582        /*
 583         * If the I/O failed, we're aborting the commit and already shutdown.
 584         * Wake any commit waiters before aborting the log items so we don't
 585         * block async log pushers on callbacks. Async log pushers explicitly do
 586         * not wait on log force completion because they may be holding locks
 587         * required to unpin items.
 588         */
 589        if (abort) {
 590                spin_lock(&ctx->cil->xc_push_lock);
 591                wake_up_all(&ctx->cil->xc_commit_wait);
 592                spin_unlock(&ctx->cil->xc_push_lock);
 593        }
 594
 595        xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
 596                                        ctx->start_lsn, abort);
 597
 598        xfs_extent_busy_sort(&ctx->busy_extents);
 599        xfs_extent_busy_clear(mp, &ctx->busy_extents,
 600                             (mp->m_flags & XFS_MOUNT_DISCARD) && !abort);
 601
 602        spin_lock(&ctx->cil->xc_push_lock);
 603        list_del(&ctx->committing);
 604        spin_unlock(&ctx->cil->xc_push_lock);
 605
 606        xlog_cil_free_logvec(ctx->lv_chain);
 607
 608        if (!list_empty(&ctx->busy_extents))
 609                xlog_discard_busy_extents(mp, ctx);
 610        else
 611                kmem_free(ctx);
 612}
 613
 614void
 615xlog_cil_process_committed(
 616        struct list_head        *list,
 617        bool                    aborted)
 618{
 619        struct xfs_cil_ctx      *ctx;
 620
 621        while ((ctx = list_first_entry_or_null(list,
 622                        struct xfs_cil_ctx, iclog_entry))) {
 623                list_del(&ctx->iclog_entry);
 624                xlog_cil_committed(ctx, aborted);
 625        }
 626}
 627
 628/*
 629 * Push the Committed Item List to the log. If @push_seq flag is zero, then it
 630 * is a background flush and so we can chose to ignore it. Otherwise, if the
 631 * current sequence is the same as @push_seq we need to do a flush. If
 632 * @push_seq is less than the current sequence, then it has already been
 633 * flushed and we don't need to do anything - the caller will wait for it to
 634 * complete if necessary.
 635 *
 636 * @push_seq is a value rather than a flag because that allows us to do an
 637 * unlocked check of the sequence number for a match. Hence we can allows log
 638 * forces to run racily and not issue pushes for the same sequence twice. If we
 639 * get a race between multiple pushes for the same sequence they will block on
 640 * the first one and then abort, hence avoiding needless pushes.
 641 */
 642STATIC int
 643xlog_cil_push(
 644        struct xlog             *log)
 645{
 646        struct xfs_cil          *cil = log->l_cilp;
 647        struct xfs_log_vec      *lv;
 648        struct xfs_cil_ctx      *ctx;
 649        struct xfs_cil_ctx      *new_ctx;
 650        struct xlog_in_core     *commit_iclog;
 651        struct xlog_ticket      *tic;
 652        int                     num_iovecs;
 653        int                     error = 0;
 654        struct xfs_trans_header thdr;
 655        struct xfs_log_iovec    lhdr;
 656        struct xfs_log_vec      lvhdr = { NULL };
 657        xfs_lsn_t               commit_lsn;
 658        xfs_lsn_t               push_seq;
 659
 660        if (!cil)
 661                return 0;
 662
 663        new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_NOFS);
 664        new_ctx->ticket = xlog_cil_ticket_alloc(log);
 665
 666        down_write(&cil->xc_ctx_lock);
 667        ctx = cil->xc_ctx;
 668
 669        spin_lock(&cil->xc_push_lock);
 670        push_seq = cil->xc_push_seq;
 671        ASSERT(push_seq <= ctx->sequence);
 672
 673        /*
 674         * Check if we've anything to push. If there is nothing, then we don't
 675         * move on to a new sequence number and so we have to be able to push
 676         * this sequence again later.
 677         */
 678        if (list_empty(&cil->xc_cil)) {
 679                cil->xc_push_seq = 0;
 680                spin_unlock(&cil->xc_push_lock);
 681                goto out_skip;
 682        }
 683
 684
 685        /* check for a previously pushed sequence */
 686        if (push_seq < cil->xc_ctx->sequence) {
 687                spin_unlock(&cil->xc_push_lock);
 688                goto out_skip;
 689        }
 690
 691        /*
 692         * We are now going to push this context, so add it to the committing
 693         * list before we do anything else. This ensures that anyone waiting on
 694         * this push can easily detect the difference between a "push in
 695         * progress" and "CIL is empty, nothing to do".
 696         *
 697         * IOWs, a wait loop can now check for:
 698         *      the current sequence not being found on the committing list;
 699         *      an empty CIL; and
 700         *      an unchanged sequence number
 701         * to detect a push that had nothing to do and therefore does not need
 702         * waiting on. If the CIL is not empty, we get put on the committing
 703         * list before emptying the CIL and bumping the sequence number. Hence
 704         * an empty CIL and an unchanged sequence number means we jumped out
 705         * above after doing nothing.
 706         *
 707         * Hence the waiter will either find the commit sequence on the
 708         * committing list or the sequence number will be unchanged and the CIL
 709         * still dirty. In that latter case, the push has not yet started, and
 710         * so the waiter will have to continue trying to check the CIL
 711         * committing list until it is found. In extreme cases of delay, the
 712         * sequence may fully commit between the attempts the wait makes to wait
 713         * on the commit sequence.
 714         */
 715        list_add(&ctx->committing, &cil->xc_committing);
 716        spin_unlock(&cil->xc_push_lock);
 717
 718        /*
 719         * pull all the log vectors off the items in the CIL, and
 720         * remove the items from the CIL. We don't need the CIL lock
 721         * here because it's only needed on the transaction commit
 722         * side which is currently locked out by the flush lock.
 723         */
 724        lv = NULL;
 725        num_iovecs = 0;
 726        while (!list_empty(&cil->xc_cil)) {
 727                struct xfs_log_item     *item;
 728
 729                item = list_first_entry(&cil->xc_cil,
 730                                        struct xfs_log_item, li_cil);
 731                list_del_init(&item->li_cil);
 732                if (!ctx->lv_chain)
 733                        ctx->lv_chain = item->li_lv;
 734                else
 735                        lv->lv_next = item->li_lv;
 736                lv = item->li_lv;
 737                item->li_lv = NULL;
 738                num_iovecs += lv->lv_niovecs;
 739        }
 740
 741        /*
 742         * initialise the new context and attach it to the CIL. Then attach
 743         * the current context to the CIL committing lsit so it can be found
 744         * during log forces to extract the commit lsn of the sequence that
 745         * needs to be forced.
 746         */
 747        INIT_LIST_HEAD(&new_ctx->committing);
 748        INIT_LIST_HEAD(&new_ctx->busy_extents);
 749        new_ctx->sequence = ctx->sequence + 1;
 750        new_ctx->cil = cil;
 751        cil->xc_ctx = new_ctx;
 752
 753        /*
 754         * The switch is now done, so we can drop the context lock and move out
 755         * of a shared context. We can't just go straight to the commit record,
 756         * though - we need to synchronise with previous and future commits so
 757         * that the commit records are correctly ordered in the log to ensure
 758         * that we process items during log IO completion in the correct order.
 759         *
 760         * For example, if we get an EFI in one checkpoint and the EFD in the
 761         * next (e.g. due to log forces), we do not want the checkpoint with
 762         * the EFD to be committed before the checkpoint with the EFI.  Hence
 763         * we must strictly order the commit records of the checkpoints so
 764         * that: a) the checkpoint callbacks are attached to the iclogs in the
 765         * correct order; and b) the checkpoints are replayed in correct order
 766         * in log recovery.
 767         *
 768         * Hence we need to add this context to the committing context list so
 769         * that higher sequences will wait for us to write out a commit record
 770         * before they do.
 771         *
 772         * xfs_log_force_lsn requires us to mirror the new sequence into the cil
 773         * structure atomically with the addition of this sequence to the
 774         * committing list. This also ensures that we can do unlocked checks
 775         * against the current sequence in log forces without risking
 776         * deferencing a freed context pointer.
 777         */
 778        spin_lock(&cil->xc_push_lock);
 779        cil->xc_current_sequence = new_ctx->sequence;
 780        spin_unlock(&cil->xc_push_lock);
 781        up_write(&cil->xc_ctx_lock);
 782
 783        /*
 784         * Build a checkpoint transaction header and write it to the log to
 785         * begin the transaction. We need to account for the space used by the
 786         * transaction header here as it is not accounted for in xlog_write().
 787         *
 788         * The LSN we need to pass to the log items on transaction commit is
 789         * the LSN reported by the first log vector write. If we use the commit
 790         * record lsn then we can move the tail beyond the grant write head.
 791         */
 792        tic = ctx->ticket;
 793        thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
 794        thdr.th_type = XFS_TRANS_CHECKPOINT;
 795        thdr.th_tid = tic->t_tid;
 796        thdr.th_num_items = num_iovecs;
 797        lhdr.i_addr = &thdr;
 798        lhdr.i_len = sizeof(xfs_trans_header_t);
 799        lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
 800        tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
 801
 802        lvhdr.lv_niovecs = 1;
 803        lvhdr.lv_iovecp = &lhdr;
 804        lvhdr.lv_next = ctx->lv_chain;
 805
 806        error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0);
 807        if (error)
 808                goto out_abort_free_ticket;
 809
 810        /*
 811         * now that we've written the checkpoint into the log, strictly
 812         * order the commit records so replay will get them in the right order.
 813         */
 814restart:
 815        spin_lock(&cil->xc_push_lock);
 816        list_for_each_entry(new_ctx, &cil->xc_committing, committing) {
 817                /*
 818                 * Avoid getting stuck in this loop because we were woken by the
 819                 * shutdown, but then went back to sleep once already in the
 820                 * shutdown state.
 821                 */
 822                if (XLOG_FORCED_SHUTDOWN(log)) {
 823                        spin_unlock(&cil->xc_push_lock);
 824                        goto out_abort_free_ticket;
 825                }
 826
 827                /*
 828                 * Higher sequences will wait for this one so skip them.
 829                 * Don't wait for our own sequence, either.
 830                 */
 831                if (new_ctx->sequence >= ctx->sequence)
 832                        continue;
 833                if (!new_ctx->commit_lsn) {
 834                        /*
 835                         * It is still being pushed! Wait for the push to
 836                         * complete, then start again from the beginning.
 837                         */
 838                        xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
 839                        goto restart;
 840                }
 841        }
 842        spin_unlock(&cil->xc_push_lock);
 843
 844        /* xfs_log_done always frees the ticket on error. */
 845        commit_lsn = xfs_log_done(log->l_mp, tic, &commit_iclog, false);
 846        if (commit_lsn == -1)
 847                goto out_abort;
 848
 849        spin_lock(&commit_iclog->ic_callback_lock);
 850        if (commit_iclog->ic_state == XLOG_STATE_IOERROR) {
 851                spin_unlock(&commit_iclog->ic_callback_lock);
 852                goto out_abort;
 853        }
 854        ASSERT_ALWAYS(commit_iclog->ic_state == XLOG_STATE_ACTIVE ||
 855                      commit_iclog->ic_state == XLOG_STATE_WANT_SYNC);
 856        list_add_tail(&ctx->iclog_entry, &commit_iclog->ic_callbacks);
 857        spin_unlock(&commit_iclog->ic_callback_lock);
 858
 859        /*
 860         * now the checkpoint commit is complete and we've attached the
 861         * callbacks to the iclog we can assign the commit LSN to the context
 862         * and wake up anyone who is waiting for the commit to complete.
 863         */
 864        spin_lock(&cil->xc_push_lock);
 865        ctx->commit_lsn = commit_lsn;
 866        wake_up_all(&cil->xc_commit_wait);
 867        spin_unlock(&cil->xc_push_lock);
 868
 869        /* release the hounds! */
 870        return xfs_log_release_iclog(log->l_mp, commit_iclog);
 871
 872out_skip:
 873        up_write(&cil->xc_ctx_lock);
 874        xfs_log_ticket_put(new_ctx->ticket);
 875        kmem_free(new_ctx);
 876        return 0;
 877
 878out_abort_free_ticket:
 879        xfs_log_ticket_put(tic);
 880out_abort:
 881        xlog_cil_committed(ctx, true);
 882        return -EIO;
 883}
 884
 885static void
 886xlog_cil_push_work(
 887        struct work_struct      *work)
 888{
 889        struct xfs_cil          *cil = container_of(work, struct xfs_cil,
 890                                                        xc_push_work);
 891        xlog_cil_push(cil->xc_log);
 892}
 893
 894/*
 895 * We need to push CIL every so often so we don't cache more than we can fit in
 896 * the log. The limit really is that a checkpoint can't be more than half the
 897 * log (the current checkpoint is not allowed to overwrite the previous
 898 * checkpoint), but commit latency and memory usage limit this to a smaller
 899 * size.
 900 */
 901static void
 902xlog_cil_push_background(
 903        struct xlog     *log)
 904{
 905        struct xfs_cil  *cil = log->l_cilp;
 906
 907        /*
 908         * The cil won't be empty because we are called while holding the
 909         * context lock so whatever we added to the CIL will still be there
 910         */
 911        ASSERT(!list_empty(&cil->xc_cil));
 912
 913        /*
 914         * don't do a background push if we haven't used up all the
 915         * space available yet.
 916         */
 917        if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
 918                return;
 919
 920        spin_lock(&cil->xc_push_lock);
 921        if (cil->xc_push_seq < cil->xc_current_sequence) {
 922                cil->xc_push_seq = cil->xc_current_sequence;
 923                queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
 924        }
 925        spin_unlock(&cil->xc_push_lock);
 926
 927}
 928
 929/*
 930 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence
 931 * number that is passed. When it returns, the work will be queued for
 932 * @push_seq, but it won't be completed. The caller is expected to do any
 933 * waiting for push_seq to complete if it is required.
 934 */
 935static void
 936xlog_cil_push_now(
 937        struct xlog     *log,
 938        xfs_lsn_t       push_seq)
 939{
 940        struct xfs_cil  *cil = log->l_cilp;
 941
 942        if (!cil)
 943                return;
 944
 945        ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
 946
 947        /* start on any pending background push to minimise wait time on it */
 948        flush_work(&cil->xc_push_work);
 949
 950        /*
 951         * If the CIL is empty or we've already pushed the sequence then
 952         * there's no work we need to do.
 953         */
 954        spin_lock(&cil->xc_push_lock);
 955        if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
 956                spin_unlock(&cil->xc_push_lock);
 957                return;
 958        }
 959
 960        cil->xc_push_seq = push_seq;
 961        queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
 962        spin_unlock(&cil->xc_push_lock);
 963}
 964
 965bool
 966xlog_cil_empty(
 967        struct xlog     *log)
 968{
 969        struct xfs_cil  *cil = log->l_cilp;
 970        bool            empty = false;
 971
 972        spin_lock(&cil->xc_push_lock);
 973        if (list_empty(&cil->xc_cil))
 974                empty = true;
 975        spin_unlock(&cil->xc_push_lock);
 976        return empty;
 977}
 978
 979/*
 980 * Commit a transaction with the given vector to the Committed Item List.
 981 *
 982 * To do this, we need to format the item, pin it in memory if required and
 983 * account for the space used by the transaction. Once we have done that we
 984 * need to release the unused reservation for the transaction, attach the
 985 * transaction to the checkpoint context so we carry the busy extents through
 986 * to checkpoint completion, and then unlock all the items in the transaction.
 987 *
 988 * Called with the context lock already held in read mode to lock out
 989 * background commit, returns without it held once background commits are
 990 * allowed again.
 991 */
 992void
 993xfs_log_commit_cil(
 994        struct xfs_mount        *mp,
 995        struct xfs_trans        *tp,
 996        xfs_lsn_t               *commit_lsn,
 997        bool                    regrant)
 998{
 999        struct xlog             *log = mp->m_log;
1000        struct xfs_cil          *cil = log->l_cilp;
1001        struct xfs_log_item     *lip, *next;
1002        xfs_lsn_t               xc_commit_lsn;
1003
1004        /*
1005         * Do all necessary memory allocation before we lock the CIL.
1006         * This ensures the allocation does not deadlock with a CIL
1007         * push in memory reclaim (e.g. from kswapd).
1008         */
1009        xlog_cil_alloc_shadow_bufs(log, tp);
1010
1011        /* lock out background commit */
1012        down_read(&cil->xc_ctx_lock);
1013
1014        xlog_cil_insert_items(log, tp);
1015
1016        xc_commit_lsn = cil->xc_ctx->sequence;
1017        if (commit_lsn)
1018                *commit_lsn = xc_commit_lsn;
1019
1020        xfs_log_done(mp, tp->t_ticket, NULL, regrant);
1021        tp->t_ticket = NULL;
1022        xfs_trans_unreserve_and_mod_sb(tp);
1023
1024        /*
1025         * Once all the items of the transaction have been copied to the CIL,
1026         * the items can be unlocked and possibly freed.
1027         *
1028         * This needs to be done before we drop the CIL context lock because we
1029         * have to update state in the log items and unlock them before they go
1030         * to disk. If we don't, then the CIL checkpoint can race with us and
1031         * we can run checkpoint completion before we've updated and unlocked
1032         * the log items. This affects (at least) processing of stale buffers,
1033         * inodes and EFIs.
1034         */
1035        trace_xfs_trans_commit_items(tp, _RET_IP_);
1036        list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
1037                xfs_trans_del_item(lip);
1038                if (lip->li_ops->iop_committing)
1039                        lip->li_ops->iop_committing(lip, xc_commit_lsn);
1040        }
1041        xlog_cil_push_background(log);
1042
1043        up_read(&cil->xc_ctx_lock);
1044}
1045
1046/*
1047 * Conditionally push the CIL based on the sequence passed in.
1048 *
1049 * We only need to push if we haven't already pushed the sequence
1050 * number given. Hence the only time we will trigger a push here is
1051 * if the push sequence is the same as the current context.
1052 *
1053 * We return the current commit lsn to allow the callers to determine if a
1054 * iclog flush is necessary following this call.
1055 */
1056xfs_lsn_t
1057xlog_cil_force_lsn(
1058        struct xlog     *log,
1059        xfs_lsn_t       sequence)
1060{
1061        struct xfs_cil          *cil = log->l_cilp;
1062        struct xfs_cil_ctx      *ctx;
1063        xfs_lsn_t               commit_lsn = NULLCOMMITLSN;
1064
1065        ASSERT(sequence <= cil->xc_current_sequence);
1066
1067        /*
1068         * check to see if we need to force out the current context.
1069         * xlog_cil_push() handles racing pushes for the same sequence,
1070         * so no need to deal with it here.
1071         */
1072restart:
1073        xlog_cil_push_now(log, sequence);
1074
1075        /*
1076         * See if we can find a previous sequence still committing.
1077         * We need to wait for all previous sequence commits to complete
1078         * before allowing the force of push_seq to go ahead. Hence block
1079         * on commits for those as well.
1080         */
1081        spin_lock(&cil->xc_push_lock);
1082        list_for_each_entry(ctx, &cil->xc_committing, committing) {
1083                /*
1084                 * Avoid getting stuck in this loop because we were woken by the
1085                 * shutdown, but then went back to sleep once already in the
1086                 * shutdown state.
1087                 */
1088                if (XLOG_FORCED_SHUTDOWN(log))
1089                        goto out_shutdown;
1090                if (ctx->sequence > sequence)
1091                        continue;
1092                if (!ctx->commit_lsn) {
1093                        /*
1094                         * It is still being pushed! Wait for the push to
1095                         * complete, then start again from the beginning.
1096                         */
1097                        xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
1098                        goto restart;
1099                }
1100                if (ctx->sequence != sequence)
1101                        continue;
1102                /* found it! */
1103                commit_lsn = ctx->commit_lsn;
1104        }
1105
1106        /*
1107         * The call to xlog_cil_push_now() executes the push in the background.
1108         * Hence by the time we have got here it our sequence may not have been
1109         * pushed yet. This is true if the current sequence still matches the
1110         * push sequence after the above wait loop and the CIL still contains
1111         * dirty objects. This is guaranteed by the push code first adding the
1112         * context to the committing list before emptying the CIL.
1113         *
1114         * Hence if we don't find the context in the committing list and the
1115         * current sequence number is unchanged then the CIL contents are
1116         * significant.  If the CIL is empty, if means there was nothing to push
1117         * and that means there is nothing to wait for. If the CIL is not empty,
1118         * it means we haven't yet started the push, because if it had started
1119         * we would have found the context on the committing list.
1120         */
1121        if (sequence == cil->xc_current_sequence &&
1122            !list_empty(&cil->xc_cil)) {
1123                spin_unlock(&cil->xc_push_lock);
1124                goto restart;
1125        }
1126
1127        spin_unlock(&cil->xc_push_lock);
1128        return commit_lsn;
1129
1130        /*
1131         * We detected a shutdown in progress. We need to trigger the log force
1132         * to pass through it's iclog state machine error handling, even though
1133         * we are already in a shutdown state. Hence we can't return
1134         * NULLCOMMITLSN here as that has special meaning to log forces (i.e.
1135         * LSN is already stable), so we return a zero LSN instead.
1136         */
1137out_shutdown:
1138        spin_unlock(&cil->xc_push_lock);
1139        return 0;
1140}
1141
1142/*
1143 * Check if the current log item was first committed in this sequence.
1144 * We can't rely on just the log item being in the CIL, we have to check
1145 * the recorded commit sequence number.
1146 *
1147 * Note: for this to be used in a non-racy manner, it has to be called with
1148 * CIL flushing locked out. As a result, it should only be used during the
1149 * transaction commit process when deciding what to format into the item.
1150 */
1151bool
1152xfs_log_item_in_current_chkpt(
1153        struct xfs_log_item *lip)
1154{
1155        struct xfs_cil_ctx *ctx;
1156
1157        if (list_empty(&lip->li_cil))
1158                return false;
1159
1160        ctx = lip->li_mountp->m_log->l_cilp->xc_ctx;
1161
1162        /*
1163         * li_seq is written on the first commit of a log item to record the
1164         * first checkpoint it is written to. Hence if it is different to the
1165         * current sequence, we're in a new checkpoint.
1166         */
1167        if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0)
1168                return false;
1169        return true;
1170}
1171
1172/*
1173 * Perform initial CIL structure initialisation.
1174 */
1175int
1176xlog_cil_init(
1177        struct xlog     *log)
1178{
1179        struct xfs_cil  *cil;
1180        struct xfs_cil_ctx *ctx;
1181
1182        cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
1183        if (!cil)
1184                return -ENOMEM;
1185
1186        ctx = kmem_zalloc(sizeof(*ctx), KM_MAYFAIL);
1187        if (!ctx) {
1188                kmem_free(cil);
1189                return -ENOMEM;
1190        }
1191
1192        INIT_WORK(&cil->xc_push_work, xlog_cil_push_work);
1193        INIT_LIST_HEAD(&cil->xc_cil);
1194        INIT_LIST_HEAD(&cil->xc_committing);
1195        spin_lock_init(&cil->xc_cil_lock);
1196        spin_lock_init(&cil->xc_push_lock);
1197        init_rwsem(&cil->xc_ctx_lock);
1198        init_waitqueue_head(&cil->xc_commit_wait);
1199
1200        INIT_LIST_HEAD(&ctx->committing);
1201        INIT_LIST_HEAD(&ctx->busy_extents);
1202        ctx->sequence = 1;
1203        ctx->cil = cil;
1204        cil->xc_ctx = ctx;
1205        cil->xc_current_sequence = ctx->sequence;
1206
1207        cil->xc_log = log;
1208        log->l_cilp = cil;
1209        return 0;
1210}
1211
1212void
1213xlog_cil_destroy(
1214        struct xlog     *log)
1215{
1216        if (log->l_cilp->xc_ctx) {
1217                if (log->l_cilp->xc_ctx->ticket)
1218                        xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
1219                kmem_free(log->l_cilp->xc_ctx);
1220        }
1221
1222        ASSERT(list_empty(&log->l_cilp->xc_cil));
1223        kmem_free(log->l_cilp);
1224}
1225
1226