linux/drivers/staging/lustre/lustre/mgc/mgc_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/mgc/mgc_request.c
  37 *
  38 * Author: Nathan Rutman <nathan@clusterfs.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_MGC
  42#define D_MGC D_CONFIG /*|D_WARNING*/
  43
  44#include <linux/module.h>
  45#include "../include/obd_class.h"
  46#include "../include/lustre_dlm.h"
  47#include "../include/lprocfs_status.h"
  48#include "../include/lustre_log.h"
  49#include "../include/lustre_disk.h"
  50
  51#include "mgc_internal.h"
  52
  53static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
  54                          int type)
  55{
  56        __u64 resname = 0;
  57
  58        if (len > sizeof(resname)) {
  59                CERROR("name too long: %s\n", name);
  60                return -EINVAL;
  61        }
  62        if (len <= 0) {
  63                CERROR("missing name: %s\n", name);
  64                return -EINVAL;
  65        }
  66        memcpy(&resname, name, len);
  67
  68        /* Always use the same endianness for the resid */
  69        memset(res_id, 0, sizeof(*res_id));
  70        res_id->name[0] = cpu_to_le64(resname);
  71        /* XXX: unfortunately, sptlprc and config llog share one lock */
  72        switch (type) {
  73        case CONFIG_T_CONFIG:
  74        case CONFIG_T_SPTLRPC:
  75                resname = 0;
  76                break;
  77        case CONFIG_T_RECOVER:
  78        case CONFIG_T_PARAMS:
  79                resname = type;
  80                break;
  81        default:
  82                LBUG();
  83        }
  84        res_id->name[1] = cpu_to_le64(resname);
  85        CDEBUG(D_MGC, "log %s to resid %#llx/%#llx (%.8s)\n", name,
  86               res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
  87        return 0;
  88}
  89
  90int mgc_fsname2resid(char *fsname, struct ldlm_res_id *res_id, int type)
  91{
  92        /* fsname is at most 8 chars long, maybe contain "-".
  93         * e.g. "lustre", "SUN-000"
  94         */
  95        return mgc_name2resid(fsname, strlen(fsname), res_id, type);
  96}
  97EXPORT_SYMBOL(mgc_fsname2resid);
  98
  99static int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id, int type)
 100{
 101        char *name_end;
 102        int len;
 103
 104        /* logname consists of "fsname-nodetype".
 105         * e.g. "lustre-MDT0001", "SUN-000-client"
 106         * there is an exception: llog "params"
 107         */
 108        name_end = strrchr(logname, '-');
 109        if (!name_end)
 110                len = strlen(logname);
 111        else
 112                len = name_end - logname;
 113        return mgc_name2resid(logname, len, res_id, type);
 114}
 115
 116/********************** config llog list **********************/
 117static LIST_HEAD(config_llog_list);
 118static DEFINE_SPINLOCK(config_list_lock);
 119
 120/* Take a reference to a config log */
 121static int config_log_get(struct config_llog_data *cld)
 122{
 123        atomic_inc(&cld->cld_refcount);
 124        CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
 125               atomic_read(&cld->cld_refcount));
 126        return 0;
 127}
 128
 129/* Drop a reference to a config log.  When no longer referenced,
 130 * we can free the config log data
 131 */
 132static void config_log_put(struct config_llog_data *cld)
 133{
 134        CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
 135               atomic_read(&cld->cld_refcount));
 136        LASSERT(atomic_read(&cld->cld_refcount) > 0);
 137
 138        /* spinlock to make sure no item with 0 refcount in the list */
 139        if (atomic_dec_and_lock(&cld->cld_refcount, &config_list_lock)) {
 140                list_del(&cld->cld_list_chain);
 141                spin_unlock(&config_list_lock);
 142
 143                CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
 144
 145                if (cld->cld_recover)
 146                        config_log_put(cld->cld_recover);
 147                if (cld->cld_sptlrpc)
 148                        config_log_put(cld->cld_sptlrpc);
 149                if (cld->cld_params)
 150                        config_log_put(cld->cld_params);
 151                if (cld_is_sptlrpc(cld))
 152                        sptlrpc_conf_log_stop(cld->cld_logname);
 153
 154                class_export_put(cld->cld_mgcexp);
 155                kfree(cld);
 156        }
 157}
 158
 159/* Find a config log by name */
 160static
 161struct config_llog_data *config_log_find(char *logname,
 162                                         struct config_llog_instance *cfg)
 163{
 164        struct config_llog_data *cld;
 165        struct config_llog_data *found = NULL;
 166        void *instance;
 167
 168        LASSERT(logname);
 169
 170        instance = cfg ? cfg->cfg_instance : NULL;
 171        spin_lock(&config_list_lock);
 172        list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
 173                /* check if instance equals */
 174                if (instance != cld->cld_cfg.cfg_instance)
 175                        continue;
 176
 177                /* instance may be NULL, should check name */
 178                if (strcmp(logname, cld->cld_logname) == 0) {
 179                        found = cld;
 180                        break;
 181                }
 182        }
 183        if (found) {
 184                atomic_inc(&found->cld_refcount);
 185                LASSERT(found->cld_stopping == 0 || cld_is_sptlrpc(found) == 0);
 186        }
 187        spin_unlock(&config_list_lock);
 188        return found;
 189}
 190
 191static
 192struct config_llog_data *do_config_log_add(struct obd_device *obd,
 193                                           char *logname,
 194                                           int type,
 195                                           struct config_llog_instance *cfg,
 196                                           struct super_block *sb)
 197{
 198        struct config_llog_data *cld;
 199        int                   rc;
 200
 201        CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
 202               cfg ? cfg->cfg_instance : NULL);
 203
 204        cld = kzalloc(sizeof(*cld) + strlen(logname) + 1, GFP_NOFS);
 205        if (!cld)
 206                return ERR_PTR(-ENOMEM);
 207
 208        strcpy(cld->cld_logname, logname);
 209        if (cfg)
 210                cld->cld_cfg = *cfg;
 211        else
 212                cld->cld_cfg.cfg_callback = class_config_llog_handler;
 213        mutex_init(&cld->cld_lock);
 214        cld->cld_cfg.cfg_last_idx = 0;
 215        cld->cld_cfg.cfg_flags = 0;
 216        cld->cld_cfg.cfg_sb = sb;
 217        cld->cld_type = type;
 218        atomic_set(&cld->cld_refcount, 1);
 219
 220        /* Keep the mgc around until we are done */
 221        cld->cld_mgcexp = class_export_get(obd->obd_self_export);
 222
 223        if (cld_is_sptlrpc(cld)) {
 224                sptlrpc_conf_log_start(logname);
 225                cld->cld_cfg.cfg_obdname = obd->obd_name;
 226        }
 227
 228        rc = mgc_logname2resid(logname, &cld->cld_resid, type);
 229
 230        spin_lock(&config_list_lock);
 231        list_add(&cld->cld_list_chain, &config_llog_list);
 232        spin_unlock(&config_list_lock);
 233
 234        if (rc) {
 235                config_log_put(cld);
 236                return ERR_PTR(rc);
 237        }
 238
 239        if (cld_is_sptlrpc(cld)) {
 240                rc = mgc_process_log(obd, cld);
 241                if (rc && rc != -ENOENT)
 242                        CERROR("failed processing sptlrpc log: %d\n", rc);
 243        }
 244
 245        return cld;
 246}
 247
 248static struct config_llog_data *
 249config_recover_log_add(struct obd_device *obd, char *fsname,
 250                       struct config_llog_instance *cfg,
 251                       struct super_block *sb)
 252{
 253        struct config_llog_instance lcfg = *cfg;
 254        struct config_llog_data *cld;
 255        char logname[32];
 256
 257        /* we have to use different llog for clients and mdts for cmd
 258         * where only clients are notified if one of cmd server restarts
 259         */
 260        LASSERT(strlen(fsname) < sizeof(logname) / 2);
 261        strcpy(logname, fsname);
 262        LASSERT(lcfg.cfg_instance);
 263        strcat(logname, "-cliir");
 264
 265        cld = do_config_log_add(obd, logname, CONFIG_T_RECOVER, &lcfg, sb);
 266        return cld;
 267}
 268
 269static struct config_llog_data *
 270config_params_log_add(struct obd_device *obd,
 271                      struct config_llog_instance *cfg, struct super_block *sb)
 272{
 273        struct config_llog_instance     lcfg = *cfg;
 274        struct config_llog_data         *cld;
 275
 276        lcfg.cfg_instance = sb;
 277
 278        cld = do_config_log_add(obd, PARAMS_FILENAME, CONFIG_T_PARAMS,
 279                                &lcfg, sb);
 280
 281        return cld;
 282}
 283
 284/** Add this log to the list of active logs watched by an MGC.
 285 * Active means we're watching for updates.
 286 * We have one active log per "mount" - client instance or servername.
 287 * Each instance may be at a different point in the log.
 288 */
 289static int config_log_add(struct obd_device *obd, char *logname,
 290                          struct config_llog_instance *cfg,
 291                          struct super_block *sb)
 292{
 293        struct lustre_sb_info *lsi = s2lsi(sb);
 294        struct config_llog_data *cld;
 295        struct config_llog_data *sptlrpc_cld;
 296        struct config_llog_data *params_cld;
 297        char                    seclogname[32];
 298        char                    *ptr;
 299        int                     rc;
 300
 301        CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
 302
 303        /*
 304         * for each regular log, the depended sptlrpc log name is
 305         * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
 306         */
 307        ptr = strrchr(logname, '-');
 308        if (!ptr || ptr - logname > 8) {
 309                CERROR("logname %s is too long\n", logname);
 310                return -EINVAL;
 311        }
 312
 313        memcpy(seclogname, logname, ptr - logname);
 314        strcpy(seclogname + (ptr - logname), "-sptlrpc");
 315
 316        sptlrpc_cld = config_log_find(seclogname, NULL);
 317        if (!sptlrpc_cld) {
 318                sptlrpc_cld = do_config_log_add(obd, seclogname,
 319                                                CONFIG_T_SPTLRPC, NULL, NULL);
 320                if (IS_ERR(sptlrpc_cld)) {
 321                        CERROR("can't create sptlrpc log: %s\n", seclogname);
 322                        rc = PTR_ERR(sptlrpc_cld);
 323                        goto out_err;
 324                }
 325        }
 326        params_cld = config_params_log_add(obd, cfg, sb);
 327        if (IS_ERR(params_cld)) {
 328                rc = PTR_ERR(params_cld);
 329                CERROR("%s: can't create params log: rc = %d\n",
 330                       obd->obd_name, rc);
 331                goto out_err1;
 332        }
 333
 334        cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb);
 335        if (IS_ERR(cld)) {
 336                CERROR("can't create log: %s\n", logname);
 337                rc = PTR_ERR(cld);
 338                goto out_err2;
 339        }
 340
 341        cld->cld_sptlrpc = sptlrpc_cld;
 342        cld->cld_params = params_cld;
 343
 344        LASSERT(lsi->lsi_lmd);
 345        if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
 346                struct config_llog_data *recover_cld;
 347
 348                ptr = strrchr(seclogname, '-');
 349                if (ptr) {
 350                        *ptr = 0;
 351                } else {
 352                        CERROR("%s: sptlrpc log name not correct, %s: rc = %d\n",
 353                               obd->obd_name, seclogname, -EINVAL);
 354                        config_log_put(cld);
 355                        return -EINVAL;
 356                }
 357                recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
 358                if (IS_ERR(recover_cld)) {
 359                        rc = PTR_ERR(recover_cld);
 360                        goto out_err3;
 361                }
 362                cld->cld_recover = recover_cld;
 363        }
 364
 365        return 0;
 366
 367out_err3:
 368        config_log_put(cld);
 369
 370out_err2:
 371        config_log_put(params_cld);
 372
 373out_err1:
 374        config_log_put(sptlrpc_cld);
 375
 376out_err:
 377        return rc;
 378}
 379
 380DEFINE_MUTEX(llog_process_lock);
 381
 382/** Stop watching for updates on this log.
 383 */
 384static int config_log_end(char *logname, struct config_llog_instance *cfg)
 385{
 386        struct config_llog_data *cld;
 387        struct config_llog_data *cld_sptlrpc = NULL;
 388        struct config_llog_data *cld_params = NULL;
 389        struct config_llog_data *cld_recover = NULL;
 390        int rc = 0;
 391
 392        cld = config_log_find(logname, cfg);
 393        if (!cld)
 394                return -ENOENT;
 395
 396        mutex_lock(&cld->cld_lock);
 397        /*
 398         * if cld_stopping is set, it means we didn't start the log thus
 399         * not owning the start ref. this can happen after previous umount:
 400         * the cld still hanging there waiting for lock cancel, and we
 401         * remount again but failed in the middle and call log_end without
 402         * calling start_log.
 403         */
 404        if (unlikely(cld->cld_stopping)) {
 405                mutex_unlock(&cld->cld_lock);
 406                /* drop the ref from the find */
 407                config_log_put(cld);
 408                return rc;
 409        }
 410
 411        cld->cld_stopping = 1;
 412
 413        cld_recover = cld->cld_recover;
 414        cld->cld_recover = NULL;
 415        mutex_unlock(&cld->cld_lock);
 416
 417        if (cld_recover) {
 418                mutex_lock(&cld_recover->cld_lock);
 419                cld_recover->cld_stopping = 1;
 420                mutex_unlock(&cld_recover->cld_lock);
 421                config_log_put(cld_recover);
 422        }
 423
 424        spin_lock(&config_list_lock);
 425        cld_sptlrpc = cld->cld_sptlrpc;
 426        cld->cld_sptlrpc = NULL;
 427        cld_params = cld->cld_params;
 428        cld->cld_params = NULL;
 429        spin_unlock(&config_list_lock);
 430
 431        if (cld_sptlrpc)
 432                config_log_put(cld_sptlrpc);
 433
 434        if (cld_params) {
 435                mutex_lock(&cld_params->cld_lock);
 436                cld_params->cld_stopping = 1;
 437                mutex_unlock(&cld_params->cld_lock);
 438                config_log_put(cld_params);
 439        }
 440
 441        /* drop the ref from the find */
 442        config_log_put(cld);
 443        /* drop the start ref */
 444        config_log_put(cld);
 445
 446        CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
 447               rc);
 448        return rc;
 449}
 450
 451int lprocfs_mgc_rd_ir_state(struct seq_file *m, void *data)
 452{
 453        struct obd_device       *obd = data;
 454        struct obd_import       *imp;
 455        struct obd_connect_data *ocd;
 456        struct config_llog_data *cld;
 457        int rc;
 458
 459        rc = lprocfs_climp_check(obd);
 460        if (rc)
 461                return rc;
 462
 463        imp = obd->u.cli.cl_import;
 464        ocd = &imp->imp_connect_data;
 465
 466        seq_printf(m, "imperative_recovery: %s\n",
 467                   OCD_HAS_FLAG(ocd, IMP_RECOV) ? "ENABLED" : "DISABLED");
 468        seq_printf(m, "client_state:\n");
 469
 470        spin_lock(&config_list_lock);
 471        list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
 472                if (!cld->cld_recover)
 473                        continue;
 474                seq_printf(m, "    - { client: %s, nidtbl_version: %u }\n",
 475                           cld->cld_logname,
 476                           cld->cld_recover->cld_cfg.cfg_last_idx);
 477        }
 478        spin_unlock(&config_list_lock);
 479
 480        up_read(&obd->u.cli.cl_sem);
 481        return 0;
 482}
 483
 484/* reenqueue any lost locks */
 485#define RQ_RUNNING 0x1
 486#define RQ_NOW     0x2
 487#define RQ_LATER   0x4
 488#define RQ_STOP    0x8
 489#define RQ_PRECLEANUP  0x10
 490static int rq_state;
 491static wait_queue_head_t            rq_waitq;
 492static DECLARE_COMPLETION(rq_exit);
 493static DECLARE_COMPLETION(rq_start);
 494
 495static void do_requeue(struct config_llog_data *cld)
 496{
 497        LASSERT(atomic_read(&cld->cld_refcount) > 0);
 498
 499        /* Do not run mgc_process_log on a disconnected export or an
 500         * export which is being disconnected. Take the client
 501         * semaphore to make the check non-racy.
 502         */
 503        down_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
 504        if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) {
 505                CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname);
 506                mgc_process_log(cld->cld_mgcexp->exp_obd, cld);
 507        } else {
 508                CDEBUG(D_MGC, "disconnecting, won't update log %s\n",
 509                       cld->cld_logname);
 510        }
 511        up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
 512}
 513
 514/* this timeout represents how many seconds MGC should wait before
 515 * requeue config and recover lock to the MGS. We need to randomize this
 516 * in order to not flood the MGS.
 517 */
 518#define MGC_TIMEOUT_MIN_SECONDS   5
 519#define MGC_TIMEOUT_RAND_CENTISEC 0x1ff /* ~500 */
 520
 521static int mgc_requeue_thread(void *data)
 522{
 523        bool first = true;
 524
 525        CDEBUG(D_MGC, "Starting requeue thread\n");
 526
 527        /* Keep trying failed locks periodically */
 528        spin_lock(&config_list_lock);
 529        rq_state |= RQ_RUNNING;
 530        while (1) {
 531                struct l_wait_info lwi;
 532                struct config_llog_data *cld, *cld_prev;
 533                int rand = cfs_rand() & MGC_TIMEOUT_RAND_CENTISEC;
 534                int stopped = !!(rq_state & RQ_STOP);
 535                int to;
 536
 537                /* Any new or requeued lostlocks will change the state */
 538                rq_state &= ~(RQ_NOW | RQ_LATER);
 539                spin_unlock(&config_list_lock);
 540
 541                if (first) {
 542                        first = false;
 543                        complete(&rq_start);
 544                }
 545
 546                /* Always wait a few seconds to allow the server who
 547                 * caused the lock revocation to finish its setup, plus some
 548                 * random so everyone doesn't try to reconnect at once.
 549                 */
 550                to = MGC_TIMEOUT_MIN_SECONDS * HZ;
 551                to += rand * HZ / 100; /* rand is centi-seconds */
 552                lwi = LWI_TIMEOUT(to, NULL, NULL);
 553                l_wait_event(rq_waitq, rq_state & (RQ_STOP | RQ_PRECLEANUP),
 554                             &lwi);
 555
 556                /*
 557                 * iterate & processing through the list. for each cld, process
 558                 * its depending sptlrpc cld firstly (if any) and then itself.
 559                 *
 560                 * it's guaranteed any item in the list must have
 561                 * reference > 0; and if cld_lostlock is set, at
 562                 * least one reference is taken by the previous enqueue.
 563                 */
 564                cld_prev = NULL;
 565
 566                spin_lock(&config_list_lock);
 567                rq_state &= ~RQ_PRECLEANUP;
 568                list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
 569                        if (!cld->cld_lostlock)
 570                                continue;
 571
 572                        spin_unlock(&config_list_lock);
 573
 574                        LASSERT(atomic_read(&cld->cld_refcount) > 0);
 575
 576                        /* Whether we enqueued again or not in mgc_process_log,
 577                         * we're done with the ref from the old enqueue
 578                         */
 579                        if (cld_prev)
 580                                config_log_put(cld_prev);
 581                        cld_prev = cld;
 582
 583                        cld->cld_lostlock = 0;
 584                        if (likely(!stopped))
 585                                do_requeue(cld);
 586
 587                        spin_lock(&config_list_lock);
 588                }
 589                spin_unlock(&config_list_lock);
 590                if (cld_prev)
 591                        config_log_put(cld_prev);
 592
 593                /* break after scanning the list so that we can drop
 594                 * refcount to losing lock clds
 595                 */
 596                if (unlikely(stopped)) {
 597                        spin_lock(&config_list_lock);
 598                        break;
 599                }
 600
 601                /* Wait a bit to see if anyone else needs a requeue */
 602                lwi = (struct l_wait_info) { 0 };
 603                l_wait_event(rq_waitq, rq_state & (RQ_NOW | RQ_STOP),
 604                             &lwi);
 605                spin_lock(&config_list_lock);
 606        }
 607        /* spinlock and while guarantee RQ_NOW and RQ_LATER are not set */
 608        rq_state &= ~RQ_RUNNING;
 609        spin_unlock(&config_list_lock);
 610
 611        complete(&rq_exit);
 612
 613        CDEBUG(D_MGC, "Ending requeue thread\n");
 614        return 0;
 615}
 616
 617/* Add a cld to the list to requeue.  Start the requeue thread if needed.
 618 * We are responsible for dropping the config log reference from here on out.
 619 */
 620static void mgc_requeue_add(struct config_llog_data *cld)
 621{
 622        CDEBUG(D_INFO, "log %s: requeue (r=%d sp=%d st=%x)\n",
 623               cld->cld_logname, atomic_read(&cld->cld_refcount),
 624               cld->cld_stopping, rq_state);
 625        LASSERT(atomic_read(&cld->cld_refcount) > 0);
 626
 627        mutex_lock(&cld->cld_lock);
 628        if (cld->cld_stopping || cld->cld_lostlock) {
 629                mutex_unlock(&cld->cld_lock);
 630                return;
 631        }
 632        /* this refcount will be released in mgc_requeue_thread. */
 633        config_log_get(cld);
 634        cld->cld_lostlock = 1;
 635        mutex_unlock(&cld->cld_lock);
 636
 637        /* Hold lock for rq_state */
 638        spin_lock(&config_list_lock);
 639        if (rq_state & RQ_STOP) {
 640                spin_unlock(&config_list_lock);
 641                cld->cld_lostlock = 0;
 642                config_log_put(cld);
 643        } else {
 644                rq_state |= RQ_NOW;
 645                spin_unlock(&config_list_lock);
 646                wake_up(&rq_waitq);
 647        }
 648}
 649
 650static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd)
 651{
 652        struct llog_ctxt        *ctxt;
 653        int                      rc;
 654
 655        /* setup only remote ctxt, the local disk context is switched per each
 656         * filesystem during mgc_fs_setup()
 657         */
 658        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, obd,
 659                        &llog_client_ops);
 660        if (rc)
 661                return rc;
 662
 663        ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
 664        LASSERT(ctxt);
 665
 666        llog_initiator_connect(ctxt);
 667        llog_ctxt_put(ctxt);
 668
 669        return 0;
 670}
 671
 672static int mgc_llog_fini(const struct lu_env *env, struct obd_device *obd)
 673{
 674        struct llog_ctxt *ctxt;
 675
 676        ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
 677        if (ctxt)
 678                llog_cleanup(env, ctxt);
 679
 680        return 0;
 681}
 682
 683static atomic_t mgc_count = ATOMIC_INIT(0);
 684static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 685{
 686        int rc = 0;
 687        int temp;
 688
 689        switch (stage) {
 690        case OBD_CLEANUP_EARLY:
 691                break;
 692        case OBD_CLEANUP_EXPORTS:
 693                if (atomic_dec_and_test(&mgc_count)) {
 694                        LASSERT(rq_state & RQ_RUNNING);
 695                        /* stop requeue thread */
 696                        temp = RQ_STOP;
 697                } else {
 698                        /* wakeup requeue thread to clean our cld */
 699                        temp = RQ_NOW | RQ_PRECLEANUP;
 700                }
 701                spin_lock(&config_list_lock);
 702                rq_state |= temp;
 703                spin_unlock(&config_list_lock);
 704                wake_up(&rq_waitq);
 705                if (temp & RQ_STOP)
 706                        wait_for_completion(&rq_exit);
 707                obd_cleanup_client_import(obd);
 708                rc = mgc_llog_fini(NULL, obd);
 709                if (rc != 0)
 710                        CERROR("failed to cleanup llogging subsystems\n");
 711                break;
 712        }
 713        return rc;
 714}
 715
 716static int mgc_cleanup(struct obd_device *obd)
 717{
 718        /* COMPAT_146 - old config logs may have added profiles we don't
 719         * know about
 720         */
 721        if (obd->obd_type->typ_refcnt <= 1)
 722                /* Only for the last mgc */
 723                class_del_profiles();
 724
 725        lprocfs_obd_cleanup(obd);
 726        ptlrpcd_decref();
 727
 728        return client_obd_cleanup(obd);
 729}
 730
 731static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 732{
 733        struct lprocfs_static_vars lvars = { NULL };
 734        struct task_struct *task;
 735        int rc;
 736
 737        ptlrpcd_addref();
 738
 739        rc = client_obd_setup(obd, lcfg);
 740        if (rc)
 741                goto err_decref;
 742
 743        rc = mgc_llog_init(NULL, obd);
 744        if (rc) {
 745                CERROR("failed to setup llogging subsystems\n");
 746                goto err_cleanup;
 747        }
 748
 749        lprocfs_mgc_init_vars(&lvars);
 750        lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
 751        sptlrpc_lprocfs_cliobd_attach(obd);
 752
 753        if (atomic_inc_return(&mgc_count) == 1) {
 754                rq_state = 0;
 755                init_waitqueue_head(&rq_waitq);
 756
 757                /* start requeue thread */
 758                task = kthread_run(mgc_requeue_thread, NULL, "ll_cfg_requeue");
 759                if (IS_ERR(task)) {
 760                        rc = PTR_ERR(task);
 761                        CERROR("%s: cannot start requeue thread: rc = %d; no more log updates\n",
 762                               obd->obd_name, rc);
 763                        goto err_cleanup;
 764                }
 765                /* rc is the task_struct pointer of mgc_requeue_thread. */
 766                rc = 0;
 767                wait_for_completion(&rq_start);
 768        }
 769
 770        return rc;
 771
 772err_cleanup:
 773        client_obd_cleanup(obd);
 774err_decref:
 775        ptlrpcd_decref();
 776        return rc;
 777}
 778
 779/* based on ll_mdc_blocking_ast */
 780static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 781                            void *data, int flag)
 782{
 783        struct lustre_handle lockh;
 784        struct config_llog_data *cld = data;
 785        int rc = 0;
 786
 787        switch (flag) {
 788        case LDLM_CB_BLOCKING:
 789                /* mgs wants the lock, give it up... */
 790                LDLM_DEBUG(lock, "MGC blocking CB");
 791                ldlm_lock2handle(lock, &lockh);
 792                rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
 793                break;
 794        case LDLM_CB_CANCELING:
 795                /* We've given up the lock, prepare ourselves to update. */
 796                LDLM_DEBUG(lock, "MGC cancel CB");
 797
 798                CDEBUG(D_MGC, "Lock res "DLDLMRES" (%.8s)\n",
 799                       PLDLMRES(lock->l_resource),
 800                       (char *)&lock->l_resource->lr_name.name[0]);
 801
 802                if (!cld) {
 803                        CDEBUG(D_INFO, "missing data, won't requeue\n");
 804                        break;
 805                }
 806
 807                /* held at mgc_process_log(). */
 808                LASSERT(atomic_read(&cld->cld_refcount) > 0);
 809                /* Are we done with this log? */
 810                if (cld->cld_stopping) {
 811                        CDEBUG(D_MGC, "log %s: stopping, won't requeue\n",
 812                               cld->cld_logname);
 813                        config_log_put(cld);
 814                        break;
 815                }
 816                /* Make sure not to re-enqueue when the mgc is stopping
 817                 * (we get called from client_disconnect_export)
 818                 */
 819                if (!lock->l_conn_export ||
 820                    !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
 821                        CDEBUG(D_MGC, "log %.8s: disconnecting, won't requeue\n",
 822                               cld->cld_logname);
 823                        config_log_put(cld);
 824                        break;
 825                }
 826
 827                /* Re-enqueue now */
 828                mgc_requeue_add(cld);
 829                config_log_put(cld);
 830                break;
 831        default:
 832                LBUG();
 833        }
 834
 835        return rc;
 836}
 837
 838/* Not sure where this should go... */
 839/* This is the timeout value for MGS_CONNECT request plus a ping interval, such
 840 * that we can have a chance to try the secondary MGS if any.
 841 */
 842#define  MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \
 843                                + PING_INTERVAL)
 844#define  MGC_TARGET_REG_LIMIT 10
 845#define  MGC_SEND_PARAM_LIMIT 10
 846
 847/* Send parameter to MGS*/
 848static int mgc_set_mgs_param(struct obd_export *exp,
 849                             struct mgs_send_param *msp)
 850{
 851        struct ptlrpc_request *req;
 852        struct mgs_send_param *req_msp, *rep_msp;
 853        int rc;
 854
 855        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 856                                        &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
 857                                        MGS_SET_INFO);
 858        if (!req)
 859                return -ENOMEM;
 860
 861        req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
 862        if (!req_msp) {
 863                ptlrpc_req_finished(req);
 864                return -ENOMEM;
 865        }
 866
 867        memcpy(req_msp, msp, sizeof(*req_msp));
 868        ptlrpc_request_set_replen(req);
 869
 870        /* Limit how long we will wait for the enqueue to complete */
 871        req->rq_delay_limit = MGC_SEND_PARAM_LIMIT;
 872        rc = ptlrpc_queue_wait(req);
 873        if (!rc) {
 874                rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
 875                memcpy(msp, rep_msp, sizeof(*rep_msp));
 876        }
 877
 878        ptlrpc_req_finished(req);
 879
 880        return rc;
 881}
 882
 883/* Take a config lock so we can get cancel notifications */
 884static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
 885                       __u32 type, ldlm_policy_data_t *policy, __u32 mode,
 886                       __u64 *flags, void *bl_cb, void *cp_cb, void *gl_cb,
 887                       void *data, __u32 lvb_len, void *lvb_swabber,
 888                       struct lustre_handle *lockh)
 889{
 890        struct config_llog_data *cld = data;
 891        struct ldlm_enqueue_info einfo = {
 892                .ei_type        = type,
 893                .ei_mode        = mode,
 894                .ei_cb_bl       = mgc_blocking_ast,
 895                .ei_cb_cp       = ldlm_completion_ast,
 896        };
 897        struct ptlrpc_request *req;
 898        int short_limit = cld_is_sptlrpc(cld);
 899        int rc;
 900
 901        CDEBUG(D_MGC, "Enqueue for %s (res %#llx)\n", cld->cld_logname,
 902               cld->cld_resid.name[0]);
 903
 904        /* We need a callback for every lockholder, so don't try to
 905         * ldlm_lock_match (see rev 1.1.2.11.2.47)
 906         */
 907        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 908                                        &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
 909                                        LDLM_ENQUEUE);
 910        if (!req)
 911                return -ENOMEM;
 912
 913        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0);
 914        ptlrpc_request_set_replen(req);
 915
 916        /* Limit how long we will wait for the enqueue to complete */
 917        req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT;
 918        rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags,
 919                              NULL, 0, LVB_T_NONE, lockh, 0);
 920        /* A failed enqueue should still call the mgc_blocking_ast,
 921         * where it will be requeued if needed ("grant failed").
 922         */
 923        ptlrpc_req_finished(req);
 924        return rc;
 925}
 926
 927static void mgc_notify_active(struct obd_device *unused)
 928{
 929        /* wakeup mgc_requeue_thread to requeue mgc lock */
 930        spin_lock(&config_list_lock);
 931        rq_state |= RQ_NOW;
 932        spin_unlock(&config_list_lock);
 933        wake_up(&rq_waitq);
 934
 935        /* TODO: Help the MGS rebuild nidtbl. -jay */
 936}
 937
 938/* Send target_reg message to MGS */
 939static int mgc_target_register(struct obd_export *exp,
 940                               struct mgs_target_info *mti)
 941{
 942        struct ptlrpc_request  *req;
 943        struct mgs_target_info *req_mti, *rep_mti;
 944        int                  rc;
 945
 946        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 947                                        &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
 948                                        MGS_TARGET_REG);
 949        if (!req)
 950                return -ENOMEM;
 951
 952        req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
 953        if (!req_mti) {
 954                ptlrpc_req_finished(req);
 955                return -ENOMEM;
 956        }
 957
 958        memcpy(req_mti, mti, sizeof(*req_mti));
 959        ptlrpc_request_set_replen(req);
 960        CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
 961        /* Limit how long we will wait for the enqueue to complete */
 962        req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
 963
 964        rc = ptlrpc_queue_wait(req);
 965        if (!rc) {
 966                rep_mti = req_capsule_server_get(&req->rq_pill,
 967                                                 &RMF_MGS_TARGET_INFO);
 968                memcpy(mti, rep_mti, sizeof(*rep_mti));
 969                CDEBUG(D_MGC, "register %s got index = %d\n",
 970                       mti->mti_svname, mti->mti_stripe_index);
 971        }
 972        ptlrpc_req_finished(req);
 973
 974        return rc;
 975}
 976
 977static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 978                              u32 keylen, void *key, u32 vallen,
 979                              void *val, struct ptlrpc_request_set *set)
 980{
 981        int rc = -EINVAL;
 982
 983        /* Turn off initial_recov after we try all backup servers once */
 984        if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
 985                struct obd_import *imp = class_exp2cliimp(exp);
 986                int value;
 987
 988                if (vallen != sizeof(int))
 989                        return -EINVAL;
 990                value = *(int *)val;
 991                CDEBUG(D_MGC, "InitRecov %s %d/d%d:i%d:r%d:or%d:%s\n",
 992                       imp->imp_obd->obd_name, value,
 993                       imp->imp_deactive, imp->imp_invalid,
 994                       imp->imp_replayable, imp->imp_obd->obd_replayable,
 995                       ptlrpc_import_state_name(imp->imp_state));
 996                /* Resurrect if we previously died */
 997                if ((imp->imp_state != LUSTRE_IMP_FULL &&
 998                     imp->imp_state != LUSTRE_IMP_NEW) || value > 1)
 999                        ptlrpc_reconnect_import(imp);
1000                return 0;
1001        }
1002        if (KEY_IS(KEY_SET_INFO)) {
1003                struct mgs_send_param *msp;
1004
1005                msp = val;
1006                rc =  mgc_set_mgs_param(exp, msp);
1007                return rc;
1008        }
1009        if (KEY_IS(KEY_MGSSEC)) {
1010                struct client_obd     *cli = &exp->exp_obd->u.cli;
1011                struct sptlrpc_flavor  flvr;
1012
1013                /*
1014                 * empty string means using current flavor, if which haven't
1015                 * been set yet, set it as null.
1016                 *
1017                 * if flavor has been set previously, check the asking flavor
1018                 * must match the existing one.
1019                 */
1020                if (vallen == 0) {
1021                        if (cli->cl_flvr_mgc.sf_rpc != SPTLRPC_FLVR_INVALID)
1022                                return 0;
1023                        val = "null";
1024                        vallen = 4;
1025                }
1026
1027                rc = sptlrpc_parse_flavor(val, &flvr);
1028                if (rc) {
1029                        CERROR("invalid sptlrpc flavor %s to MGS\n",
1030                               (char *) val);
1031                        return rc;
1032                }
1033
1034                /*
1035                 * caller already hold a mutex
1036                 */
1037                if (cli->cl_flvr_mgc.sf_rpc == SPTLRPC_FLVR_INVALID) {
1038                        cli->cl_flvr_mgc = flvr;
1039                } else if (memcmp(&cli->cl_flvr_mgc, &flvr,
1040                                  sizeof(flvr)) != 0) {
1041                        char    str[20];
1042
1043                        sptlrpc_flavor2name(&cli->cl_flvr_mgc,
1044                                            str, sizeof(str));
1045                        LCONSOLE_ERROR("asking sptlrpc flavor %s to MGS but currently %s is in use\n",
1046                                       (char *) val, str);
1047                        rc = -EPERM;
1048                }
1049                return rc;
1050        }
1051
1052        return rc;
1053}
1054
1055static int mgc_get_info(const struct lu_env *env, struct obd_export *exp,
1056                        __u32 keylen, void *key, __u32 *vallen, void *val,
1057                        struct lov_stripe_md *unused)
1058{
1059        int rc = -EINVAL;
1060
1061        if (KEY_IS(KEY_CONN_DATA)) {
1062                struct obd_import *imp = class_exp2cliimp(exp);
1063                struct obd_connect_data *data = val;
1064
1065                if (*vallen == sizeof(*data)) {
1066                        *data = imp->imp_connect_data;
1067                        rc = 0;
1068                }
1069        }
1070
1071        return rc;
1072}
1073
1074static int mgc_import_event(struct obd_device *obd,
1075                            struct obd_import *imp,
1076                            enum obd_import_event event)
1077{
1078        LASSERT(imp->imp_obd == obd);
1079        CDEBUG(D_MGC, "import event %#x\n", event);
1080
1081        switch (event) {
1082        case IMP_EVENT_DISCON:
1083                /* MGC imports should not wait for recovery */
1084                if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1085                        ptlrpc_pinger_ir_down();
1086                break;
1087        case IMP_EVENT_INACTIVE:
1088                break;
1089        case IMP_EVENT_INVALIDATE: {
1090                struct ldlm_namespace *ns = obd->obd_namespace;
1091
1092                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1093                break;
1094        }
1095        case IMP_EVENT_ACTIVE:
1096                CDEBUG(D_INFO, "%s: Reactivating import\n", obd->obd_name);
1097                /* Clearing obd_no_recov allows us to continue pinging */
1098                obd->obd_no_recov = 0;
1099                mgc_notify_active(obd);
1100                if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1101                        ptlrpc_pinger_ir_up();
1102                break;
1103        case IMP_EVENT_OCD:
1104                break;
1105        case IMP_EVENT_DEACTIVATE:
1106        case IMP_EVENT_ACTIVATE:
1107                break;
1108        default:
1109                CERROR("Unknown import event %#x\n", event);
1110                LBUG();
1111        }
1112        return 0;
1113}
1114
1115enum {
1116        CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_SHIFT),
1117        CONFIG_READ_NRPAGES      = 4
1118};
1119
1120static int mgc_apply_recover_logs(struct obd_device *mgc,
1121                                  struct config_llog_data *cld,
1122                                  __u64 max_version,
1123                                  void *data, int datalen, bool mne_swab)
1124{
1125        struct config_llog_instance *cfg = &cld->cld_cfg;
1126        struct mgs_nidtbl_entry *entry;
1127        struct lustre_cfg       *lcfg;
1128        struct lustre_cfg_bufs   bufs;
1129        u64   prev_version = 0;
1130        char *inst;
1131        char *buf;
1132        int   bufsz;
1133        int   pos;
1134        int   rc  = 0;
1135        int   off = 0;
1136
1137        LASSERT(cfg->cfg_instance);
1138        LASSERT(cfg->cfg_sb == cfg->cfg_instance);
1139
1140        inst = kzalloc(PAGE_SIZE, GFP_KERNEL);
1141        if (!inst)
1142                return -ENOMEM;
1143
1144        pos = snprintf(inst, PAGE_SIZE, "%p", cfg->cfg_instance);
1145        if (pos >= PAGE_SIZE) {
1146                kfree(inst);
1147                return -E2BIG;
1148        }
1149
1150        ++pos;
1151        buf   = inst + pos;
1152        bufsz = PAGE_SIZE - pos;
1153
1154        while (datalen > 0) {
1155                int   entry_len = sizeof(*entry);
1156                int   is_ost;
1157                struct obd_device *obd;
1158                char *obdname;
1159                char *cname;
1160                char *params;
1161                char *uuid;
1162
1163                rc = -EINVAL;
1164                if (datalen < sizeof(*entry))
1165                        break;
1166
1167                entry = (typeof(entry))(data + off);
1168
1169                /* sanity check */
1170                if (entry->mne_nid_type != 0) /* only support type 0 for ipv4 */
1171                        break;
1172                if (entry->mne_nid_count == 0) /* at least one nid entry */
1173                        break;
1174                if (entry->mne_nid_size != sizeof(lnet_nid_t))
1175                        break;
1176
1177                entry_len += entry->mne_nid_count * entry->mne_nid_size;
1178                if (datalen < entry_len) /* must have entry_len at least */
1179                        break;
1180
1181                /* Keep this swab for normal mixed endian handling. LU-1644 */
1182                if (mne_swab)
1183                        lustre_swab_mgs_nidtbl_entry(entry);
1184                if (entry->mne_length > PAGE_SIZE) {
1185                        CERROR("MNE too large (%u)\n", entry->mne_length);
1186                        break;
1187                }
1188
1189                if (entry->mne_length < entry_len)
1190                        break;
1191
1192                off     += entry->mne_length;
1193                datalen -= entry->mne_length;
1194                if (datalen < 0)
1195                        break;
1196
1197                if (entry->mne_version > max_version) {
1198                        CERROR("entry index(%lld) is over max_index(%lld)\n",
1199                               entry->mne_version, max_version);
1200                        break;
1201                }
1202
1203                if (prev_version >= entry->mne_version) {
1204                        CERROR("index unsorted, prev %lld, now %lld\n",
1205                               prev_version, entry->mne_version);
1206                        break;
1207                }
1208                prev_version = entry->mne_version;
1209
1210                /*
1211                 * Write a string with format "nid::instance" to
1212                 * lustre/<osc|mdc>/<target>-<osc|mdc>-<instance>/import.
1213                 */
1214
1215                is_ost = entry->mne_type == LDD_F_SV_TYPE_OST;
1216                memset(buf, 0, bufsz);
1217                obdname = buf;
1218                pos = 0;
1219
1220                /* lustre-OST0001-osc-<instance #> */
1221                strcpy(obdname, cld->cld_logname);
1222                cname = strrchr(obdname, '-');
1223                if (!cname) {
1224                        CERROR("mgc %s: invalid logname %s\n",
1225                               mgc->obd_name, obdname);
1226                        break;
1227                }
1228
1229                pos = cname - obdname;
1230                obdname[pos] = 0;
1231                pos += sprintf(obdname + pos, "-%s%04x",
1232                                  is_ost ? "OST" : "MDT", entry->mne_index);
1233
1234                cname = is_ost ? "osc" : "mdc";
1235                pos += sprintf(obdname + pos, "-%s-%s", cname, inst);
1236                lustre_cfg_bufs_reset(&bufs, obdname);
1237
1238                /* find the obd by obdname */
1239                obd = class_name2obd(obdname);
1240                if (!obd) {
1241                        CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
1242                               mgc->obd_name, obdname);
1243                        rc = 0;
1244                        /* this is a safe race, when the ost is starting up...*/
1245                        continue;
1246                }
1247
1248                /* osc.import = "connection=<Conn UUID>::<target instance>" */
1249                ++pos;
1250                params = buf + pos;
1251                pos += sprintf(params, "%s.import=%s", cname, "connection=");
1252                uuid = buf + pos;
1253
1254                down_read(&obd->u.cli.cl_sem);
1255                if (!obd->u.cli.cl_import) {
1256                        /* client does not connect to the OST yet */
1257                        up_read(&obd->u.cli.cl_sem);
1258                        rc = 0;
1259                        continue;
1260                }
1261
1262                /* TODO: iterate all nids to find one */
1263                /* find uuid by nid */
1264                rc = client_import_find_conn(obd->u.cli.cl_import,
1265                                             entry->u.nids[0],
1266                                             (struct obd_uuid *)uuid);
1267                up_read(&obd->u.cli.cl_sem);
1268                if (rc < 0) {
1269                        CERROR("mgc: cannot find uuid by nid %s\n",
1270                               libcfs_nid2str(entry->u.nids[0]));
1271                        break;
1272                }
1273
1274                CDEBUG(D_INFO, "Find uuid %s by nid %s\n",
1275                       uuid, libcfs_nid2str(entry->u.nids[0]));
1276
1277                pos += strlen(uuid);
1278                pos += sprintf(buf + pos, "::%u", entry->mne_instance);
1279                LASSERT(pos < bufsz);
1280
1281                lustre_cfg_bufs_set_string(&bufs, 1, params);
1282
1283                rc = -ENOMEM;
1284                lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
1285                if (IS_ERR(lcfg)) {
1286                        CERROR("mgc: cannot allocate memory\n");
1287                        break;
1288                }
1289
1290                CDEBUG(D_INFO, "ir apply logs %lld/%lld for %s -> %s\n",
1291                       prev_version, max_version, obdname, params);
1292
1293                rc = class_process_config(lcfg);
1294                lustre_cfg_free(lcfg);
1295                if (rc)
1296                        CDEBUG(D_INFO, "process config for %s error %d\n",
1297                               obdname, rc);
1298
1299                /* continue, even one with error */
1300        }
1301
1302        kfree(inst);
1303        return rc;
1304}
1305
1306/**
1307 * This function is called if this client was notified for target restarting
1308 * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs.
1309 */
1310static int mgc_process_recover_log(struct obd_device *obd,
1311                                   struct config_llog_data *cld)
1312{
1313        struct ptlrpc_request *req = NULL;
1314        struct config_llog_instance *cfg = &cld->cld_cfg;
1315        struct mgs_config_body *body;
1316        struct mgs_config_res  *res;
1317        struct ptlrpc_bulk_desc *desc;
1318        struct page **pages;
1319        int nrpages;
1320        bool eof = true;
1321        bool mne_swab;
1322        int i;
1323        int ealen;
1324        int rc;
1325
1326        /* allocate buffer for bulk transfer.
1327         * if this is the first time for this mgs to read logs,
1328         * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
1329         * once; otherwise, it only reads increment of logs, this should be
1330         * small and CONFIG_READ_NRPAGES will be used.
1331         */
1332        nrpages = CONFIG_READ_NRPAGES;
1333        if (cfg->cfg_last_idx == 0) /* the first time */
1334                nrpages = CONFIG_READ_NRPAGES_INIT;
1335
1336        pages = kcalloc(nrpages, sizeof(*pages), GFP_KERNEL);
1337        if (!pages) {
1338                rc = -ENOMEM;
1339                goto out;
1340        }
1341
1342        for (i = 0; i < nrpages; i++) {
1343                pages[i] = alloc_page(GFP_KERNEL);
1344                if (!pages[i]) {
1345                        rc = -ENOMEM;
1346                        goto out;
1347                }
1348        }
1349
1350again:
1351        LASSERT(cld_is_recover(cld));
1352        LASSERT(mutex_is_locked(&cld->cld_lock));
1353        req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
1354                                   &RQF_MGS_CONFIG_READ);
1355        if (!req) {
1356                rc = -ENOMEM;
1357                goto out;
1358        }
1359
1360        rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
1361        if (rc)
1362                goto out;
1363
1364        /* pack request */
1365        body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1366        LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
1367        if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
1368            >= sizeof(body->mcb_name)) {
1369                rc = -E2BIG;
1370                goto out;
1371        }
1372        body->mcb_offset = cfg->cfg_last_idx + 1;
1373        body->mcb_type   = cld->cld_type;
1374        body->mcb_bits   = PAGE_SHIFT;
1375        body->mcb_units  = nrpages;
1376
1377        /* allocate bulk transfer descriptor */
1378        desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
1379                                    MGS_BULK_PORTAL);
1380        if (!desc) {
1381                rc = -ENOMEM;
1382                goto out;
1383        }
1384
1385        for (i = 0; i < nrpages; i++)
1386                ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_SIZE);
1387
1388        ptlrpc_request_set_replen(req);
1389        rc = ptlrpc_queue_wait(req);
1390        if (rc)
1391                goto out;
1392
1393        res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1394        if (res->mcr_size < res->mcr_offset) {
1395                rc = -EINVAL;
1396                goto out;
1397        }
1398
1399        /* always update the index even though it might have errors with
1400         * handling the recover logs
1401         */
1402        cfg->cfg_last_idx = res->mcr_offset;
1403        eof = res->mcr_offset == res->mcr_size;
1404
1405        CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
1406               res->mcr_offset, eof == false);
1407
1408        ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
1409        if (ealen < 0) {
1410                rc = ealen;
1411                goto out;
1412        }
1413
1414        if (ealen > nrpages << PAGE_SHIFT) {
1415                rc = -EINVAL;
1416                goto out;
1417        }
1418
1419        if (ealen == 0) { /* no logs transferred */
1420                if (!eof)
1421                        rc = -EINVAL;
1422                goto out;
1423        }
1424
1425        mne_swab = !!ptlrpc_rep_need_swab(req);
1426#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0)
1427        /* This import flag means the server did an extra swab of IR MNE
1428         * records (fixed in LU-1252), reverse it here if needed. LU-1644
1429         */
1430        if (unlikely(req->rq_import->imp_need_mne_swab))
1431                mne_swab = !mne_swab;
1432#else
1433#warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and imp_need_mne_swab"
1434#endif
1435
1436        for (i = 0; i < nrpages && ealen > 0; i++) {
1437                int rc2;
1438                void *ptr;
1439
1440                ptr = kmap(pages[i]);
1441                rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
1442                                             min_t(int, ealen, PAGE_SIZE),
1443                                             mne_swab);
1444                kunmap(pages[i]);
1445                if (rc2 < 0) {
1446                        CWARN("Process recover log %s error %d\n",
1447                              cld->cld_logname, rc2);
1448                        break;
1449                }
1450
1451                ealen -= PAGE_SIZE;
1452        }
1453
1454out:
1455        if (req)
1456                ptlrpc_req_finished(req);
1457
1458        if (rc == 0 && !eof)
1459                goto again;
1460
1461        if (pages) {
1462                for (i = 0; i < nrpages; i++) {
1463                        if (!pages[i])
1464                                break;
1465                        __free_page(pages[i]);
1466                }
1467                kfree(pages);
1468        }
1469        return rc;
1470}
1471
1472/* local_only means it cannot get remote llogs */
1473static int mgc_process_cfg_log(struct obd_device *mgc,
1474                               struct config_llog_data *cld, int local_only)
1475{
1476        struct llog_ctxt        *ctxt;
1477        struct lustre_sb_info   *lsi = NULL;
1478        int                      rc = 0;
1479        bool                     sptlrpc_started = false;
1480        struct lu_env           *env;
1481
1482        LASSERT(cld);
1483        LASSERT(mutex_is_locked(&cld->cld_lock));
1484
1485        /*
1486         * local copy of sptlrpc log is controlled elsewhere, don't try to
1487         * read it up here.
1488         */
1489        if (cld_is_sptlrpc(cld) && local_only)
1490                return 0;
1491
1492        if (cld->cld_cfg.cfg_sb)
1493                lsi = s2lsi(cld->cld_cfg.cfg_sb);
1494
1495        env = kzalloc(sizeof(*env), GFP_KERNEL);
1496        if (!env)
1497                return -ENOMEM;
1498
1499        rc = lu_env_init(env, LCT_MG_THREAD);
1500        if (rc)
1501                goto out_free;
1502
1503        ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
1504        LASSERT(ctxt);
1505
1506        if (local_only) /* no local log at client side */ {
1507                rc = -EIO;
1508                goto out_pop;
1509        }
1510
1511        if (cld_is_sptlrpc(cld)) {
1512                sptlrpc_conf_log_update_begin(cld->cld_logname);
1513                sptlrpc_started = true;
1514        }
1515
1516        /* logname and instance info should be the same, so use our
1517         * copy of the instance for the update.  The cfg_last_idx will
1518         * be updated here.
1519         */
1520        rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
1521                                     &cld->cld_cfg);
1522
1523out_pop:
1524        __llog_ctxt_put(env, ctxt);
1525
1526        /*
1527         * update settings on existing OBDs. doing it inside
1528         * of llog_process_lock so no device is attaching/detaching
1529         * in parallel.
1530         * the logname must be <fsname>-sptlrpc
1531         */
1532        if (sptlrpc_started) {
1533                LASSERT(cld_is_sptlrpc(cld));
1534                sptlrpc_conf_log_update_end(cld->cld_logname);
1535                class_notify_sptlrpc_conf(cld->cld_logname,
1536                                          strlen(cld->cld_logname) -
1537                                          strlen("-sptlrpc"));
1538        }
1539
1540        lu_env_fini(env);
1541out_free:
1542        kfree(env);
1543        return rc;
1544}
1545
1546/** Get a config log from the MGS and process it.
1547 * This func is called for both clients and servers.
1548 * Copy the log locally before parsing it if appropriate (non-MGS server)
1549 */
1550int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld)
1551{
1552        struct lustre_handle lockh = { 0 };
1553        __u64 flags = LDLM_FL_NO_LRU;
1554        int rc = 0, rcl;
1555
1556        LASSERT(cld);
1557
1558        /* I don't want multiple processes running process_log at once --
1559         * sounds like badness.  It actually might be fine, as long as
1560         * we're not trying to update from the same log
1561         * simultaneously (in which case we should use a per-log sem.)
1562         */
1563        mutex_lock(&cld->cld_lock);
1564        if (cld->cld_stopping) {
1565                mutex_unlock(&cld->cld_lock);
1566                return 0;
1567        }
1568
1569        OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20);
1570
1571        CDEBUG(D_MGC, "Process log %s:%p from %d\n", cld->cld_logname,
1572               cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
1573
1574        /* Get the cfg lock on the llog */
1575        rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL,
1576                          LCK_CR, &flags, NULL, NULL, NULL,
1577                          cld, 0, NULL, &lockh);
1578        if (rcl == 0) {
1579                /* Get the cld, it will be released in mgc_blocking_ast. */
1580                config_log_get(cld);
1581                rc = ldlm_lock_set_data(&lockh, (void *)cld);
1582                LASSERT(rc == 0);
1583        } else {
1584                CDEBUG(D_MGC, "Can't get cfg lock: %d\n", rcl);
1585
1586                /* mark cld_lostlock so that it will requeue
1587                 * after MGC becomes available.
1588                 */
1589                cld->cld_lostlock = 1;
1590                /* Get extra reference, it will be put in requeue thread */
1591                config_log_get(cld);
1592        }
1593
1594        if (cld_is_recover(cld)) {
1595                rc = 0; /* this is not a fatal error for recover log */
1596                if (rcl == 0)
1597                        rc = mgc_process_recover_log(mgc, cld);
1598        } else {
1599                rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
1600        }
1601
1602        CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
1603               mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
1604
1605        mutex_unlock(&cld->cld_lock);
1606
1607        /* Now drop the lock so MGS can revoke it */
1608        if (!rcl)
1609                ldlm_lock_decref(&lockh, LCK_CR);
1610
1611        return rc;
1612}
1613
1614/** Called from lustre_process_log.
1615 * LCFG_LOG_START gets the config log from the MGS, processes it to start
1616 * any services, and adds it to the list logs to watch (follow).
1617 */
1618static int mgc_process_config(struct obd_device *obd, u32 len, void *buf)
1619{
1620        struct lustre_cfg *lcfg = buf;
1621        struct config_llog_instance *cfg = NULL;
1622        char *logname;
1623        int rc = 0;
1624
1625        switch (lcfg->lcfg_command) {
1626        case LCFG_LOV_ADD_OBD: {
1627                /* Overloading this cfg command: register a new target */
1628                struct mgs_target_info *mti;
1629
1630                if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
1631                    sizeof(struct mgs_target_info)) {
1632                        rc = -EINVAL;
1633                        goto out;
1634                }
1635
1636                mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
1637                CDEBUG(D_MGC, "add_target %s %#x\n",
1638                       mti->mti_svname, mti->mti_flags);
1639                rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1640                break;
1641        }
1642        case LCFG_LOV_DEL_OBD:
1643                /* Unregister has no meaning at the moment. */
1644                CERROR("lov_del_obd unimplemented\n");
1645                rc = -ENOSYS;
1646                break;
1647        case LCFG_SPTLRPC_CONF: {
1648                rc = sptlrpc_process_config(lcfg);
1649                break;
1650        }
1651        case LCFG_LOG_START: {
1652                struct config_llog_data *cld;
1653                struct super_block *sb;
1654
1655                logname = lustre_cfg_string(lcfg, 1);
1656                cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1657                sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1658
1659                CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
1660                       cfg->cfg_last_idx);
1661
1662                /* We're only called through here on the initial mount */
1663                rc = config_log_add(obd, logname, cfg, sb);
1664                if (rc)
1665                        break;
1666                cld = config_log_find(logname, cfg);
1667                if (!cld) {
1668                        rc = -ENOENT;
1669                        break;
1670                }
1671
1672                /* COMPAT_146 */
1673                /* FIXME only set this for old logs!  Right now this forces
1674                 * us to always skip the "inside markers" check
1675                 */
1676                cld->cld_cfg.cfg_flags |= CFG_F_COMPAT146;
1677
1678                rc = mgc_process_log(obd, cld);
1679                if (rc == 0 && cld->cld_recover) {
1680                        if (OCD_HAS_FLAG(&obd->u.cli.cl_import->
1681                                         imp_connect_data, IMP_RECOV)) {
1682                                rc = mgc_process_log(obd, cld->cld_recover);
1683                        } else {
1684                                struct config_llog_data *cir = cld->cld_recover;
1685
1686                                cld->cld_recover = NULL;
1687                                config_log_put(cir);
1688                        }
1689                        if (rc)
1690                                CERROR("Cannot process recover llog %d\n", rc);
1691                }
1692
1693                if (rc == 0 && cld->cld_params) {
1694                        rc = mgc_process_log(obd, cld->cld_params);
1695                        if (rc == -ENOENT) {
1696                                CDEBUG(D_MGC,
1697                                       "There is no params config file yet\n");
1698                                rc = 0;
1699                        }
1700                        /* params log is optional */
1701                        if (rc)
1702                                CERROR(
1703                                       "%s: can't process params llog: rc = %d\n",
1704                                       obd->obd_name, rc);
1705                }
1706                config_log_put(cld);
1707
1708                break;
1709        }
1710        case LCFG_LOG_END: {
1711                logname = lustre_cfg_string(lcfg, 1);
1712
1713                if (lcfg->lcfg_bufcount >= 2)
1714                        cfg = (struct config_llog_instance *)lustre_cfg_buf(
1715                                lcfg, 2);
1716                rc = config_log_end(logname, cfg);
1717                break;
1718        }
1719        default: {
1720                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1721                rc = -EINVAL;
1722                goto out;
1723
1724        }
1725        }
1726out:
1727        return rc;
1728}
1729
1730static struct obd_ops mgc_obd_ops = {
1731        .owner          = THIS_MODULE,
1732        .setup          = mgc_setup,
1733        .precleanup     = mgc_precleanup,
1734        .cleanup        = mgc_cleanup,
1735        .add_conn       = client_import_add_conn,
1736        .del_conn       = client_import_del_conn,
1737        .connect        = client_connect_import,
1738        .disconnect     = client_disconnect_export,
1739        /* .enqueue     = mgc_enqueue, */
1740        /* .iocontrol   = mgc_iocontrol, */
1741        .set_info_async = mgc_set_info_async,
1742        .get_info       = mgc_get_info,
1743        .import_event   = mgc_import_event,
1744        .process_config = mgc_process_config,
1745};
1746
1747static int __init mgc_init(void)
1748{
1749        return class_register_type(&mgc_obd_ops, NULL,
1750                                   LUSTRE_MGC_NAME, NULL);
1751}
1752
1753static void /*__exit*/ mgc_exit(void)
1754{
1755        class_unregister_type(LUSTRE_MGC_NAME);
1756}
1757
1758MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1759MODULE_DESCRIPTION("Lustre Management Client");
1760MODULE_VERSION(LUSTRE_VERSION_STRING);
1761MODULE_LICENSE("GPL");
1762
1763module_init(mgc_init);
1764module_exit(mgc_exit);
1765