linux/drivers/staging/lustre/lustre/ptlrpc/import.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/ptlrpc/import.c
  37 *
  38 * Author: Mike Shaver <shaver@clusterfs.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_RPC
  42
  43#include "../include/obd_support.h"
  44#include "../include/lustre_ha.h"
  45#include "../include/lustre_net.h"
  46#include "../include/lustre_import.h"
  47#include "../include/lustre_export.h"
  48#include "../include/obd.h"
  49#include "../include/obd_cksum.h"
  50#include "../include/obd_class.h"
  51
  52#include "ptlrpc_internal.h"
  53
  54struct ptlrpc_connect_async_args {
  55         __u64 pcaa_peer_committed;
  56        int pcaa_initial_connect;
  57};
  58
  59/**
  60 * Updates import \a imp current state to provided \a state value
  61 * Helper function. Must be called under imp_lock.
  62 */
  63static void __import_set_state(struct obd_import *imp,
  64                               enum lustre_imp_state state)
  65{
  66        imp->imp_state = state;
  67        imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
  68        imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
  69                get_seconds();
  70        imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
  71                IMP_STATE_HIST_LEN;
  72}
  73
  74/* A CLOSED import should remain so. */
  75#define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
  76do {                                                                           \
  77        if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
  78                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",   \
  79                       imp, obd2cli_tgt(imp->imp_obd),                         \
  80                       ptlrpc_import_state_name(imp->imp_state),               \
  81                       ptlrpc_import_state_name(state));                       \
  82                __import_set_state(imp, state);                                \
  83        }                                                                      \
  84} while (0)
  85
  86#define IMPORT_SET_STATE(imp, state)                                    \
  87do {                                                                    \
  88        spin_lock(&imp->imp_lock);                                      \
  89        IMPORT_SET_STATE_NOLOCK(imp, state);                            \
  90        spin_unlock(&imp->imp_lock);                                    \
  91} while (0)
  92
  93
  94static int ptlrpc_connect_interpret(const struct lu_env *env,
  95                                    struct ptlrpc_request *request,
  96                                    void * data, int rc);
  97int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
  98
  99/* Only this function is allowed to change the import state when it is
 100 * CLOSED. I would rather refcount the import and free it after
 101 * disconnection like we do with exports. To do that, the client_obd
 102 * will need to save the peer info somewhere other than in the import,
 103 * though. */
 104int ptlrpc_init_import(struct obd_import *imp)
 105{
 106        spin_lock(&imp->imp_lock);
 107
 108        imp->imp_generation++;
 109        imp->imp_state =  LUSTRE_IMP_NEW;
 110
 111        spin_unlock(&imp->imp_lock);
 112
 113        return 0;
 114}
 115EXPORT_SYMBOL(ptlrpc_init_import);
 116
 117#define UUID_STR "_UUID"
 118void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
 119{
 120        *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
 121                ? uuid : uuid + strlen(prefix);
 122
 123        *uuid_len = strlen(*uuid_start);
 124
 125        if (*uuid_len < strlen(UUID_STR))
 126                return;
 127
 128        if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
 129                    UUID_STR, strlen(UUID_STR)))
 130                *uuid_len -= strlen(UUID_STR);
 131}
 132EXPORT_SYMBOL(deuuidify);
 133
 134/**
 135 * Returns true if import was FULL, false if import was already not
 136 * connected.
 137 * @imp - import to be disconnected
 138 * @conn_cnt - connection count (epoch) of the request that timed out
 139 *           and caused the disconnection.  In some cases, multiple
 140 *           inflight requests can fail to a single target (e.g. OST
 141 *           bulk requests) and if one has already caused a reconnection
 142 *           (increasing the import->conn_cnt) the older failure should
 143 *           not also cause a reconnection.  If zero it forces a reconnect.
 144 */
 145int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
 146{
 147        int rc = 0;
 148
 149        spin_lock(&imp->imp_lock);
 150
 151        if (imp->imp_state == LUSTRE_IMP_FULL &&
 152            (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
 153                char *target_start;
 154                int   target_len;
 155
 156                deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
 157                          &target_start, &target_len);
 158
 159                if (imp->imp_replayable) {
 160                        LCONSOLE_WARN("%s: Connection to %.*s (at %s) was "
 161                               "lost; in progress operations using this "
 162                               "service will wait for recovery to complete\n",
 163                               imp->imp_obd->obd_name, target_len, target_start,
 164                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
 165                } else {
 166                        LCONSOLE_ERROR_MSG(0x166, "%s: Connection to "
 167                               "%.*s (at %s) was lost; in progress "
 168                               "operations using this service will fail\n",
 169                               imp->imp_obd->obd_name,
 170                               target_len, target_start,
 171                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
 172                }
 173                IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
 174                spin_unlock(&imp->imp_lock);
 175
 176                if (obd_dump_on_timeout)
 177                        libcfs_debug_dumplog();
 178
 179                obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
 180                rc = 1;
 181        } else {
 182                spin_unlock(&imp->imp_lock);
 183                CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
 184                       imp->imp_client->cli_name, imp,
 185                       (imp->imp_state == LUSTRE_IMP_FULL &&
 186                        imp->imp_conn_cnt > conn_cnt) ?
 187                       "reconnected" : "not connected", imp->imp_conn_cnt,
 188                       conn_cnt, ptlrpc_import_state_name(imp->imp_state));
 189        }
 190
 191        return rc;
 192}
 193
 194/* Must be called with imp_lock held! */
 195static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
 196{
 197        assert_spin_locked(&imp->imp_lock);
 198
 199        CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
 200        imp->imp_invalid = 1;
 201        imp->imp_generation++;
 202        spin_unlock(&imp->imp_lock);
 203
 204        ptlrpc_abort_inflight(imp);
 205        obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
 206}
 207
 208/*
 209 * This acts as a barrier; all existing requests are rejected, and
 210 * no new requests will be accepted until the import is valid again.
 211 */
 212void ptlrpc_deactivate_import(struct obd_import *imp)
 213{
 214        spin_lock(&imp->imp_lock);
 215        ptlrpc_deactivate_and_unlock_import(imp);
 216}
 217EXPORT_SYMBOL(ptlrpc_deactivate_import);
 218
 219static unsigned int
 220ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
 221{
 222        long dl;
 223
 224        if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
 225              (req->rq_phase == RQ_PHASE_BULK) ||
 226              (req->rq_phase == RQ_PHASE_NEW)))
 227                return 0;
 228
 229        if (req->rq_timedout)
 230                return 0;
 231
 232        if (req->rq_phase == RQ_PHASE_NEW)
 233                dl = req->rq_sent;
 234        else
 235                dl = req->rq_deadline;
 236
 237        if (dl <= now)
 238                return 0;
 239
 240        return dl - now;
 241}
 242
 243static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
 244{
 245        time_t now = get_seconds();
 246        struct list_head *tmp, *n;
 247        struct ptlrpc_request *req;
 248        unsigned int timeout = 0;
 249
 250        spin_lock(&imp->imp_lock);
 251        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
 252                req = list_entry(tmp, struct ptlrpc_request, rq_list);
 253                timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
 254        }
 255        spin_unlock(&imp->imp_lock);
 256        return timeout;
 257}
 258
 259/**
 260 * This function will invalidate the import, if necessary, then block
 261 * for all the RPC completions, and finally notify the obd to
 262 * invalidate its state (ie cancel locks, clear pending requests,
 263 * etc).
 264 */
 265void ptlrpc_invalidate_import(struct obd_import *imp)
 266{
 267        struct list_head *tmp, *n;
 268        struct ptlrpc_request *req;
 269        struct l_wait_info lwi;
 270        unsigned int timeout;
 271        int rc;
 272
 273        atomic_inc(&imp->imp_inval_count);
 274
 275        if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
 276                ptlrpc_deactivate_import(imp);
 277
 278        CFS_FAIL_TIMEOUT(OBD_FAIL_MGS_CONNECT_NET, 3 * cfs_fail_val / 2);
 279        LASSERT(imp->imp_invalid);
 280
 281        /* Wait forever until inflight == 0. We really can't do it another
 282         * way because in some cases we need to wait for very long reply
 283         * unlink. We can't do anything before that because there is really
 284         * no guarantee that some rdma transfer is not in progress right now. */
 285        do {
 286                /* Calculate max timeout for waiting on rpcs to error
 287                 * out. Use obd_timeout if calculated value is smaller
 288                 * than it. */
 289                if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
 290                        timeout = ptlrpc_inflight_timeout(imp);
 291                        timeout += timeout / 3;
 292
 293                        if (timeout == 0)
 294                                timeout = obd_timeout;
 295                } else {
 296                        /* decrease the interval to increase race condition */
 297                        timeout = 1;
 298                }
 299
 300                CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
 301                       timeout);
 302
 303                /* Wait for all requests to error out and call completion
 304                 * callbacks. Cap it at obd_timeout -- these should all
 305                 * have been locally cancelled by ptlrpc_abort_inflight. */
 306                lwi = LWI_TIMEOUT_INTERVAL(
 307                        cfs_timeout_cap(cfs_time_seconds(timeout)),
 308                        (timeout > 1)?cfs_time_seconds(1):cfs_time_seconds(1)/2,
 309                        NULL, NULL);
 310                rc = l_wait_event(imp->imp_recovery_waitq,
 311                                  (atomic_read(&imp->imp_inflight) == 0),
 312                                  &lwi);
 313                if (rc) {
 314                        const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
 315
 316                        CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
 317                               cli_tgt, rc,
 318                               atomic_read(&imp->imp_inflight));
 319
 320                        spin_lock(&imp->imp_lock);
 321                        if (atomic_read(&imp->imp_inflight) == 0) {
 322                                int count = atomic_read(&imp->imp_unregistering);
 323
 324                                /* We know that "unregistering" rpcs only can
 325                                 * survive in sending or delaying lists (they
 326                                 * maybe waiting for long reply unlink in
 327                                 * sluggish nets). Let's check this. If there
 328                                 * is no inflight and unregistering != 0, this
 329                                 * is bug. */
 330                                LASSERTF(count == 0, "Some RPCs are still "
 331                                         "unregistering: %d\n", count);
 332
 333                                /* Let's save one loop as soon as inflight have
 334                                 * dropped to zero. No new inflights possible at
 335                                 * this point. */
 336                                rc = 0;
 337                        } else {
 338                                list_for_each_safe(tmp, n,
 339                                                       &imp->imp_sending_list) {
 340                                        req = list_entry(tmp,
 341                                                             struct ptlrpc_request,
 342                                                             rq_list);
 343                                        DEBUG_REQ(D_ERROR, req,
 344                                                  "still on sending list");
 345                                }
 346                                list_for_each_safe(tmp, n,
 347                                                       &imp->imp_delayed_list) {
 348                                        req = list_entry(tmp,
 349                                                             struct ptlrpc_request,
 350                                                             rq_list);
 351                                        DEBUG_REQ(D_ERROR, req,
 352                                                  "still on delayed list");
 353                                }
 354
 355                                CERROR("%s: RPCs in \"%s\" phase found (%d). "
 356                                       "Network is sluggish? Waiting them "
 357                                       "to error out.\n", cli_tgt,
 358                                       ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
 359                                       atomic_read(&imp->
 360                                                       imp_unregistering));
 361                        }
 362                        spin_unlock(&imp->imp_lock);
 363                  }
 364        } while (rc != 0);
 365
 366        /*
 367         * Let's additionally check that no new rpcs added to import in
 368         * "invalidate" state.
 369         */
 370        LASSERT(atomic_read(&imp->imp_inflight) == 0);
 371        obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
 372        sptlrpc_import_flush_all_ctx(imp);
 373
 374        atomic_dec(&imp->imp_inval_count);
 375        wake_up_all(&imp->imp_recovery_waitq);
 376}
 377EXPORT_SYMBOL(ptlrpc_invalidate_import);
 378
 379/* unset imp_invalid */
 380void ptlrpc_activate_import(struct obd_import *imp)
 381{
 382        struct obd_device *obd = imp->imp_obd;
 383
 384        spin_lock(&imp->imp_lock);
 385        if (imp->imp_deactive != 0) {
 386                spin_unlock(&imp->imp_lock);
 387                return;
 388        }
 389
 390        imp->imp_invalid = 0;
 391        spin_unlock(&imp->imp_lock);
 392        obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
 393}
 394EXPORT_SYMBOL(ptlrpc_activate_import);
 395
 396static void ptlrpc_pinger_force(struct obd_import *imp)
 397{
 398        CDEBUG(D_HA, "%s: waking up pinger s:%s\n", obd2cli_tgt(imp->imp_obd),
 399               ptlrpc_import_state_name(imp->imp_state));
 400
 401        spin_lock(&imp->imp_lock);
 402        imp->imp_force_verify = 1;
 403        spin_unlock(&imp->imp_lock);
 404
 405        if (imp->imp_state != LUSTRE_IMP_CONNECTING)
 406                ptlrpc_pinger_wake_up();
 407}
 408
 409void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
 410{
 411        LASSERT(!imp->imp_dlm_fake);
 412
 413        if (ptlrpc_set_import_discon(imp, conn_cnt)) {
 414                if (!imp->imp_replayable) {
 415                        CDEBUG(D_HA, "import %s@%s for %s not replayable, "
 416                               "auto-deactivating\n",
 417                               obd2cli_tgt(imp->imp_obd),
 418                               imp->imp_connection->c_remote_uuid.uuid,
 419                               imp->imp_obd->obd_name);
 420                        ptlrpc_deactivate_import(imp);
 421                }
 422
 423                ptlrpc_pinger_force(imp);
 424        }
 425}
 426EXPORT_SYMBOL(ptlrpc_fail_import);
 427
 428int ptlrpc_reconnect_import(struct obd_import *imp)
 429{
 430#ifdef ENABLE_PINGER
 431        struct l_wait_info lwi;
 432        int secs = cfs_time_seconds(obd_timeout);
 433        int rc;
 434
 435        ptlrpc_pinger_force(imp);
 436
 437        CDEBUG(D_HA, "%s: recovery started, waiting %u seconds\n",
 438               obd2cli_tgt(imp->imp_obd), secs);
 439
 440        lwi = LWI_TIMEOUT(secs, NULL, NULL);
 441        rc = l_wait_event(imp->imp_recovery_waitq,
 442                          !ptlrpc_import_in_recovery(imp), &lwi);
 443        CDEBUG(D_HA, "%s: recovery finished s:%s\n", obd2cli_tgt(imp->imp_obd),
 444               ptlrpc_import_state_name(imp->imp_state));
 445        return rc;
 446#else
 447        ptlrpc_set_import_discon(imp, 0);
 448        /* Force a new connect attempt */
 449        ptlrpc_invalidate_import(imp);
 450        /* Do a fresh connect next time by zeroing the handle */
 451        ptlrpc_disconnect_import(imp, 1);
 452        /* Wait for all invalidate calls to finish */
 453        if (atomic_read(&imp->imp_inval_count) > 0) {
 454                int rc;
 455                struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 456                rc = l_wait_event(imp->imp_recovery_waitq,
 457                                  (atomic_read(&imp->imp_inval_count) == 0),
 458                                  &lwi);
 459                if (rc)
 460                        CERROR("Interrupted, inval=%d\n",
 461                               atomic_read(&imp->imp_inval_count));
 462        }
 463
 464        /* Allow reconnect attempts */
 465        imp->imp_obd->obd_no_recov = 0;
 466        /* Remove 'invalid' flag */
 467        ptlrpc_activate_import(imp);
 468        /* Attempt a new connect */
 469        ptlrpc_recover_import(imp, NULL, 0);
 470        return 0;
 471#endif
 472}
 473EXPORT_SYMBOL(ptlrpc_reconnect_import);
 474
 475/**
 476 * Connection on import \a imp is changed to another one (if more than one is
 477 * present). We typically chose connection that we have not tried to connect to
 478 * the longest
 479 */
 480static int import_select_connection(struct obd_import *imp)
 481{
 482        struct obd_import_conn *imp_conn = NULL, *conn;
 483        struct obd_export *dlmexp;
 484        char *target_start;
 485        int target_len, tried_all = 1;
 486
 487        spin_lock(&imp->imp_lock);
 488
 489        if (list_empty(&imp->imp_conn_list)) {
 490                CERROR("%s: no connections available\n",
 491                       imp->imp_obd->obd_name);
 492                spin_unlock(&imp->imp_lock);
 493                return -EINVAL;
 494        }
 495
 496        list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
 497                CDEBUG(D_HA, "%s: connect to NID %s last attempt %llu\n",
 498                       imp->imp_obd->obd_name,
 499                       libcfs_nid2str(conn->oic_conn->c_peer.nid),
 500                       conn->oic_last_attempt);
 501
 502                /* If we have not tried this connection since
 503                   the last successful attempt, go with this one */
 504                if ((conn->oic_last_attempt == 0) ||
 505                    cfs_time_beforeq_64(conn->oic_last_attempt,
 506                                       imp->imp_last_success_conn)) {
 507                        imp_conn = conn;
 508                        tried_all = 0;
 509                        break;
 510                }
 511
 512                /* If all of the connections have already been tried
 513                   since the last successful connection; just choose the
 514                   least recently used */
 515                if (!imp_conn)
 516                        imp_conn = conn;
 517                else if (cfs_time_before_64(conn->oic_last_attempt,
 518                                            imp_conn->oic_last_attempt))
 519                        imp_conn = conn;
 520        }
 521
 522        /* if not found, simply choose the current one */
 523        if (!imp_conn || imp->imp_force_reconnect) {
 524                LASSERT(imp->imp_conn_current);
 525                imp_conn = imp->imp_conn_current;
 526                tried_all = 0;
 527        }
 528        LASSERT(imp_conn->oic_conn);
 529
 530        /* If we've tried everything, and we're back to the beginning of the
 531           list, increase our timeout and try again. It will be reset when
 532           we do finally connect. (FIXME: really we should wait for all network
 533           state associated with the last connection attempt to drain before
 534           trying to reconnect on it.) */
 535        if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) {
 536                struct adaptive_timeout *at = &imp->imp_at.iat_net_latency;
 537                if (at_get(at) < CONNECTION_SWITCH_MAX) {
 538                        at_measured(at, at_get(at) + CONNECTION_SWITCH_INC);
 539                        if (at_get(at) > CONNECTION_SWITCH_MAX)
 540                                at_reset(at, CONNECTION_SWITCH_MAX);
 541                }
 542                LASSERT(imp_conn->oic_last_attempt);
 543                CDEBUG(D_HA, "%s: tried all connections, increasing latency "
 544                        "to %ds\n", imp->imp_obd->obd_name, at_get(at));
 545        }
 546
 547        imp_conn->oic_last_attempt = cfs_time_current_64();
 548
 549        /* switch connection, don't mind if it's same as the current one */
 550        if (imp->imp_connection)
 551                ptlrpc_connection_put(imp->imp_connection);
 552        imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
 553
 554        dlmexp =  class_conn2export(&imp->imp_dlm_handle);
 555        LASSERT(dlmexp != NULL);
 556        if (dlmexp->exp_connection)
 557                ptlrpc_connection_put(dlmexp->exp_connection);
 558        dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
 559        class_export_put(dlmexp);
 560
 561        if (imp->imp_conn_current != imp_conn) {
 562                if (imp->imp_conn_current) {
 563                        deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
 564                                  &target_start, &target_len);
 565
 566                        CDEBUG(D_HA, "%s: Connection changing to"
 567                               " %.*s (at %s)\n",
 568                               imp->imp_obd->obd_name,
 569                               target_len, target_start,
 570                               libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
 571                }
 572
 573                imp->imp_conn_current = imp_conn;
 574        }
 575
 576        CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
 577               imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
 578               libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
 579
 580        spin_unlock(&imp->imp_lock);
 581
 582        return 0;
 583}
 584
 585/*
 586 * must be called under imp_lock
 587 */
 588static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
 589{
 590        struct ptlrpc_request *req;
 591        struct list_head *tmp;
 592
 593        /* The requests in committed_list always have smaller transnos than
 594         * the requests in replay_list */
 595        if (!list_empty(&imp->imp_committed_list)) {
 596                tmp = imp->imp_committed_list.next;
 597                req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 598                *transno = req->rq_transno;
 599                if (req->rq_transno == 0) {
 600                        DEBUG_REQ(D_ERROR, req,
 601                                  "zero transno in committed_list");
 602                        LBUG();
 603                }
 604                return 1;
 605        }
 606        if (!list_empty(&imp->imp_replay_list)) {
 607                tmp = imp->imp_replay_list.next;
 608                req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 609                *transno = req->rq_transno;
 610                if (req->rq_transno == 0) {
 611                        DEBUG_REQ(D_ERROR, req, "zero transno in replay_list");
 612                        LBUG();
 613                }
 614                return 1;
 615        }
 616        return 0;
 617}
 618
 619/**
 620 * Attempt to (re)connect import \a imp. This includes all preparations,
 621 * initializing CONNECT RPC request and passing it to ptlrpcd for
 622 * actual sending.
 623 * Returns 0 on success or error code.
 624 */
 625int ptlrpc_connect_import(struct obd_import *imp)
 626{
 627        struct obd_device *obd = imp->imp_obd;
 628        int initial_connect = 0;
 629        int set_transno = 0;
 630        __u64 committed_before_reconnect = 0;
 631        struct ptlrpc_request *request;
 632        char *bufs[] = { NULL,
 633                         obd2cli_tgt(imp->imp_obd),
 634                         obd->obd_uuid.uuid,
 635                         (char *)&imp->imp_dlm_handle,
 636                         (char *)&imp->imp_connect_data };
 637        struct ptlrpc_connect_async_args *aa;
 638        int rc;
 639
 640        spin_lock(&imp->imp_lock);
 641        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
 642                spin_unlock(&imp->imp_lock);
 643                CERROR("can't connect to a closed import\n");
 644                return -EINVAL;
 645        } else if (imp->imp_state == LUSTRE_IMP_FULL) {
 646                spin_unlock(&imp->imp_lock);
 647                CERROR("already connected\n");
 648                return 0;
 649        } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
 650                spin_unlock(&imp->imp_lock);
 651                CERROR("already connecting\n");
 652                return -EALREADY;
 653        }
 654
 655        IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
 656
 657        imp->imp_conn_cnt++;
 658        imp->imp_resend_replay = 0;
 659
 660        if (!lustre_handle_is_used(&imp->imp_remote_handle))
 661                initial_connect = 1;
 662        else
 663                committed_before_reconnect = imp->imp_peer_committed_transno;
 664
 665        set_transno = ptlrpc_first_transno(imp,
 666                                           &imp->imp_connect_data.ocd_transno);
 667        spin_unlock(&imp->imp_lock);
 668
 669        rc = import_select_connection(imp);
 670        if (rc)
 671                GOTO(out, rc);
 672
 673        rc = sptlrpc_import_sec_adapt(imp, NULL, NULL);
 674        if (rc)
 675                GOTO(out, rc);
 676
 677        /* Reset connect flags to the originally requested flags, in case
 678         * the server is updated on-the-fly we will get the new features. */
 679        imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
 680        /* Reset ocd_version each time so the server knows the exact versions */
 681        imp->imp_connect_data.ocd_version = LUSTRE_VERSION_CODE;
 682        imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
 683        imp->imp_msghdr_flags &= ~MSGHDR_CKSUM_INCOMPAT18;
 684
 685        rc = obd_reconnect(NULL, imp->imp_obd->obd_self_export, obd,
 686                           &obd->obd_uuid, &imp->imp_connect_data, NULL);
 687        if (rc)
 688                GOTO(out, rc);
 689
 690        request = ptlrpc_request_alloc(imp, &RQF_MDS_CONNECT);
 691        if (request == NULL)
 692                GOTO(out, rc = -ENOMEM);
 693
 694        rc = ptlrpc_request_bufs_pack(request, LUSTRE_OBD_VERSION,
 695                                      imp->imp_connect_op, bufs, NULL);
 696        if (rc) {
 697                ptlrpc_request_free(request);
 698                GOTO(out, rc);
 699        }
 700
 701        /* Report the rpc service time to the server so that it knows how long
 702         * to wait for clients to join recovery */
 703        lustre_msg_set_service_time(request->rq_reqmsg,
 704                                    at_timeout2est(request->rq_timeout));
 705
 706        /* The amount of time we give the server to process the connect req.
 707         * import_select_connection will increase the net latency on
 708         * repeated reconnect attempts to cover slow networks.
 709         * We override/ignore the server rpc completion estimate here,
 710         * which may be large if this is a reconnect attempt */
 711        request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
 712        lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
 713
 714        lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
 715
 716        request->rq_no_resend = request->rq_no_delay = 1;
 717        request->rq_send_state = LUSTRE_IMP_CONNECTING;
 718        /* Allow a slightly larger reply for future growth compatibility */
 719        req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,
 720                             sizeof(struct obd_connect_data)+16*sizeof(__u64));
 721        ptlrpc_request_set_replen(request);
 722        request->rq_interpret_reply = ptlrpc_connect_interpret;
 723
 724        CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
 725        aa = ptlrpc_req_async_args(request);
 726        memset(aa, 0, sizeof(*aa));
 727
 728        aa->pcaa_peer_committed = committed_before_reconnect;
 729        aa->pcaa_initial_connect = initial_connect;
 730
 731        if (aa->pcaa_initial_connect) {
 732                spin_lock(&imp->imp_lock);
 733                imp->imp_replayable = 1;
 734                spin_unlock(&imp->imp_lock);
 735                lustre_msg_add_op_flags(request->rq_reqmsg,
 736                                        MSG_CONNECT_INITIAL);
 737        }
 738
 739        if (set_transno)
 740                lustre_msg_add_op_flags(request->rq_reqmsg,
 741                                        MSG_CONNECT_TRANSNO);
 742
 743        DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
 744                  request->rq_timeout);
 745        ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1);
 746        rc = 0;
 747out:
 748        if (rc != 0) {
 749                IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
 750        }
 751
 752        return rc;
 753}
 754EXPORT_SYMBOL(ptlrpc_connect_import);
 755
 756static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
 757{
 758        int force_verify;
 759
 760        spin_lock(&imp->imp_lock);
 761        force_verify = imp->imp_force_verify != 0;
 762        spin_unlock(&imp->imp_lock);
 763
 764        if (force_verify)
 765                ptlrpc_pinger_wake_up();
 766}
 767
 768static int ptlrpc_busy_reconnect(int rc)
 769{
 770        return (rc == -EBUSY) || (rc == -EAGAIN);
 771}
 772
 773/**
 774 * interpret_reply callback for connect RPCs.
 775 * Looks into returned status of connect operation and decides
 776 * what to do with the import - i.e enter recovery, promote it to
 777 * full state for normal operations of disconnect it due to an error.
 778 */
 779static int ptlrpc_connect_interpret(const struct lu_env *env,
 780                                    struct ptlrpc_request *request,
 781                                    void *data, int rc)
 782{
 783        struct ptlrpc_connect_async_args *aa = data;
 784        struct obd_import *imp = request->rq_import;
 785        struct client_obd *cli = &imp->imp_obd->u.cli;
 786        struct lustre_handle old_hdl;
 787        __u64 old_connect_flags;
 788        int msg_flags;
 789        struct obd_connect_data *ocd;
 790        struct obd_export *exp;
 791        int ret;
 792
 793        spin_lock(&imp->imp_lock);
 794        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
 795                imp->imp_connect_tried = 1;
 796                spin_unlock(&imp->imp_lock);
 797                return 0;
 798        }
 799
 800        if (rc) {
 801                /* if this reconnect to busy export - not need select new target
 802                 * for connecting*/
 803                imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
 804                spin_unlock(&imp->imp_lock);
 805                ptlrpc_maybe_ping_import_soon(imp);
 806                GOTO(out, rc);
 807        }
 808        spin_unlock(&imp->imp_lock);
 809
 810        LASSERT(imp->imp_conn_current);
 811
 812        msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
 813
 814        ret = req_capsule_get_size(&request->rq_pill, &RMF_CONNECT_DATA,
 815                                   RCL_SERVER);
 816        /* server replied obd_connect_data is always bigger */
 817        ocd = req_capsule_server_sized_get(&request->rq_pill,
 818                                           &RMF_CONNECT_DATA, ret);
 819
 820        if (ocd == NULL) {
 821                CERROR("%s: no connect data from server\n",
 822                       imp->imp_obd->obd_name);
 823                rc = -EPROTO;
 824                GOTO(out, rc);
 825        }
 826
 827        spin_lock(&imp->imp_lock);
 828
 829        /* All imports are pingable */
 830        imp->imp_pingable = 1;
 831        imp->imp_force_reconnect = 0;
 832        imp->imp_force_verify = 0;
 833
 834        imp->imp_connect_data = *ocd;
 835
 836        CDEBUG(D_HA, "%s: connect to target with instance %u\n",
 837               imp->imp_obd->obd_name, ocd->ocd_instance);
 838        exp = class_conn2export(&imp->imp_dlm_handle);
 839
 840        spin_unlock(&imp->imp_lock);
 841
 842        /* check that server granted subset of flags we asked for. */
 843        if ((ocd->ocd_connect_flags & imp->imp_connect_flags_orig) !=
 844            ocd->ocd_connect_flags) {
 845                CERROR("%s: Server didn't granted asked subset of flags: asked=%#llx grranted=%#llx\n",
 846                       imp->imp_obd->obd_name,imp->imp_connect_flags_orig,
 847                       ocd->ocd_connect_flags);
 848                GOTO(out, rc = -EPROTO);
 849        }
 850
 851        if (!exp) {
 852                /* This could happen if export is cleaned during the
 853                   connect attempt */
 854                CERROR("%s: missing export after connect\n",
 855                       imp->imp_obd->obd_name);
 856                GOTO(out, rc = -ENODEV);
 857        }
 858        old_connect_flags = exp_connect_flags(exp);
 859        exp->exp_connect_data = *ocd;
 860        imp->imp_obd->obd_self_export->exp_connect_data = *ocd;
 861        class_export_put(exp);
 862
 863        obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
 864
 865        if (aa->pcaa_initial_connect) {
 866                spin_lock(&imp->imp_lock);
 867                if (msg_flags & MSG_CONNECT_REPLAYABLE) {
 868                        imp->imp_replayable = 1;
 869                        spin_unlock(&imp->imp_lock);
 870                        CDEBUG(D_HA, "connected to replayable target: %s\n",
 871                               obd2cli_tgt(imp->imp_obd));
 872                } else {
 873                        imp->imp_replayable = 0;
 874                        spin_unlock(&imp->imp_lock);
 875                }
 876
 877                /* if applies, adjust the imp->imp_msg_magic here
 878                 * according to reply flags */
 879
 880                imp->imp_remote_handle =
 881                                *lustre_msg_get_handle(request->rq_repmsg);
 882
 883                /* Initial connects are allowed for clients with non-random
 884                 * uuids when servers are in recovery.  Simply signal the
 885                 * servers replay is complete and wait in REPLAY_WAIT. */
 886                if (msg_flags & MSG_CONNECT_RECOVERING) {
 887                        CDEBUG(D_HA, "connect to %s during recovery\n",
 888                               obd2cli_tgt(imp->imp_obd));
 889                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
 890                } else {
 891                        IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
 892                        ptlrpc_activate_import(imp);
 893                }
 894
 895                GOTO(finish, rc = 0);
 896        }
 897
 898        /* Determine what recovery state to move the import to. */
 899        if (MSG_CONNECT_RECONNECT & msg_flags) {
 900                memset(&old_hdl, 0, sizeof(old_hdl));
 901                if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
 902                            sizeof(old_hdl))) {
 903                        LCONSOLE_WARN("Reconnect to %s (at @%s) failed due bad handle %#llx\n",
 904                                      obd2cli_tgt(imp->imp_obd),
 905                                      imp->imp_connection->c_remote_uuid.uuid,
 906                                      imp->imp_dlm_handle.cookie);
 907                        GOTO(out, rc = -ENOTCONN);
 908                }
 909
 910                if (memcmp(&imp->imp_remote_handle,
 911                           lustre_msg_get_handle(request->rq_repmsg),
 912                           sizeof(imp->imp_remote_handle))) {
 913                        int level = msg_flags & MSG_CONNECT_RECOVERING ?
 914                                D_HA : D_WARNING;
 915
 916                        /* Bug 16611/14775: if server handle have changed,
 917                         * that means some sort of disconnection happened.
 918                         * If the server is not in recovery, that also means it
 919                         * already erased all of our state because of previous
 920                         * eviction. If it is in recovery - we are safe to
 921                         * participate since we can reestablish all of our state
 922                         * with server again */
 923                        if ((MSG_CONNECT_RECOVERING & msg_flags)) {
 924                                CDEBUG(level,"%s@%s changed server handle from %#llx to %#llx but is still in recovery\n",
 925                                       obd2cli_tgt(imp->imp_obd),
 926                                       imp->imp_connection->c_remote_uuid.uuid,
 927                                       imp->imp_remote_handle.cookie,
 928                                       lustre_msg_get_handle(
 929                                       request->rq_repmsg)->cookie);
 930                        } else {
 931                                LCONSOLE_WARN("Evicted from %s (at %s) "
 932                                              "after server handle changed from %#llx to %#llx\n",
 933                                              obd2cli_tgt(imp->imp_obd),
 934                                              imp->imp_connection-> \
 935                                              c_remote_uuid.uuid,
 936                                              imp->imp_remote_handle.cookie,
 937                                              lustre_msg_get_handle(
 938                                              request->rq_repmsg)->cookie);
 939                        }
 940
 941
 942                        imp->imp_remote_handle =
 943                                     *lustre_msg_get_handle(request->rq_repmsg);
 944
 945                        if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
 946                                IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
 947                                GOTO(finish, rc = 0);
 948                        }
 949
 950                } else {
 951                        CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
 952                               obd2cli_tgt(imp->imp_obd),
 953                               imp->imp_connection->c_remote_uuid.uuid);
 954                }
 955
 956                if (imp->imp_invalid) {
 957                        CDEBUG(D_HA, "%s: reconnected but import is invalid; "
 958                               "marking evicted\n", imp->imp_obd->obd_name);
 959                        IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
 960                } else if (MSG_CONNECT_RECOVERING & msg_flags) {
 961                        CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
 962                               imp->imp_obd->obd_name,
 963                               obd2cli_tgt(imp->imp_obd));
 964
 965                        spin_lock(&imp->imp_lock);
 966                        imp->imp_resend_replay = 1;
 967                        spin_unlock(&imp->imp_lock);
 968
 969                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
 970                } else {
 971                        IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
 972                }
 973        } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
 974                LASSERT(imp->imp_replayable);
 975                imp->imp_remote_handle =
 976                                *lustre_msg_get_handle(request->rq_repmsg);
 977                imp->imp_last_replay_transno = 0;
 978                IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
 979        } else {
 980                DEBUG_REQ(D_HA, request, "%s: evicting (reconnect/recover flags"
 981                          " not set: %x)", imp->imp_obd->obd_name, msg_flags);
 982                imp->imp_remote_handle =
 983                                *lustre_msg_get_handle(request->rq_repmsg);
 984                IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
 985        }
 986
 987        /* Sanity checks for a reconnected import. */
 988        if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
 989                CERROR("imp_replayable flag does not match server "
 990                       "after reconnect. We should LBUG right here.\n");
 991        }
 992
 993        if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
 994            lustre_msg_get_last_committed(request->rq_repmsg) <
 995            aa->pcaa_peer_committed) {
 996                CERROR("%s went back in time (transno %lld"
 997                       " was previously committed, server now claims %lld"
 998                       ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
 999                       "id=9646\n",
1000                       obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
1001                       lustre_msg_get_last_committed(request->rq_repmsg));
1002        }
1003
1004finish:
1005        rc = ptlrpc_import_recovery_state_machine(imp);
1006        if (rc != 0) {
1007                if (rc == -ENOTCONN) {
1008                        CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
1009                               "invalidating and reconnecting\n",
1010                               obd2cli_tgt(imp->imp_obd),
1011                               imp->imp_connection->c_remote_uuid.uuid);
1012                        ptlrpc_connect_import(imp);
1013                        imp->imp_connect_tried = 1;
1014                        return 0;
1015                }
1016        } else {
1017
1018                spin_lock(&imp->imp_lock);
1019                list_del(&imp->imp_conn_current->oic_item);
1020                list_add(&imp->imp_conn_current->oic_item,
1021                             &imp->imp_conn_list);
1022                imp->imp_last_success_conn =
1023                        imp->imp_conn_current->oic_last_attempt;
1024
1025                spin_unlock(&imp->imp_lock);
1026
1027                if (!ocd->ocd_ibits_known &&
1028                    ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1029                        CERROR("Inodebits aware server returned zero compatible"
1030                               " bits?\n");
1031
1032                if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1033                    (ocd->ocd_version > LUSTRE_VERSION_CODE +
1034                                        LUSTRE_VERSION_OFFSET_WARN ||
1035                     ocd->ocd_version < LUSTRE_VERSION_CODE -
1036                                        LUSTRE_VERSION_OFFSET_WARN)) {
1037                        /* Sigh, some compilers do not like #ifdef in the middle
1038                           of macro arguments */
1039                        const char *older = "older. Consider upgrading server "
1040                                            "or downgrading client";
1041                        const char *newer = "newer than client version. "
1042                                            "Consider upgrading client";
1043
1044                        LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1045                                      "is much %s (%s)\n",
1046                                      obd2cli_tgt(imp->imp_obd),
1047                                      OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1048                                      OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1049                                      OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1050                                      OBD_OCD_VERSION_FIX(ocd->ocd_version),
1051                                      ocd->ocd_version > LUSTRE_VERSION_CODE ?
1052                                      newer : older, LUSTRE_VERSION_STRING);
1053                }
1054
1055#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0)
1056                /* Check if server has LU-1252 fix applied to not always swab
1057                 * the IR MNE entries. Do this only once per connection.  This
1058                 * fixup is version-limited, because we don't want to carry the
1059                 * OBD_CONNECT_MNE_SWAB flag around forever, just so long as we
1060                 * need interop with unpatched 2.2 servers.  For newer servers,
1061                 * the client will do MNE swabbing only as needed.  LU-1644 */
1062                if (unlikely((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1063                             !(ocd->ocd_connect_flags & OBD_CONNECT_MNE_SWAB) &&
1064                             OBD_OCD_VERSION_MAJOR(ocd->ocd_version) == 2 &&
1065                             OBD_OCD_VERSION_MINOR(ocd->ocd_version) == 2 &&
1066                             OBD_OCD_VERSION_PATCH(ocd->ocd_version) < 55 &&
1067                             strcmp(imp->imp_obd->obd_type->typ_name,
1068                                    LUSTRE_MGC_NAME) == 0))
1069                        imp->imp_need_mne_swab = 1;
1070                else /* clear if server was upgraded since last connect */
1071                        imp->imp_need_mne_swab = 0;
1072#else
1073#warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and imp_need_mne_swab"
1074#endif
1075
1076                if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1077                        /* We sent to the server ocd_cksum_types with bits set
1078                         * for algorithms we understand. The server masked off
1079                         * the checksum types it doesn't support */
1080                        if ((ocd->ocd_cksum_types &
1081                             cksum_types_supported_client()) == 0) {
1082                                LCONSOLE_WARN("The negotiation of the checksum "
1083                                              "algorithm to use with server %s "
1084                                              "failed (%x/%x), disabling "
1085                                              "checksums\n",
1086                                              obd2cli_tgt(imp->imp_obd),
1087                                              ocd->ocd_cksum_types,
1088                                              cksum_types_supported_client());
1089                                cli->cl_checksum = 0;
1090                                cli->cl_supp_cksum_types = OBD_CKSUM_ADLER;
1091                        } else {
1092                                cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1093                        }
1094                } else {
1095                        /* The server does not support OBD_CONNECT_CKSUM.
1096                         * Enforce ADLER for backward compatibility*/
1097                        cli->cl_supp_cksum_types = OBD_CKSUM_ADLER;
1098                }
1099                cli->cl_cksum_type =cksum_type_select(cli->cl_supp_cksum_types);
1100
1101                if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
1102                        cli->cl_max_pages_per_rpc =
1103                                min(ocd->ocd_brw_size >> PAGE_CACHE_SHIFT,
1104                                    cli->cl_max_pages_per_rpc);
1105                else if (imp->imp_connect_op == MDS_CONNECT ||
1106                         imp->imp_connect_op == MGS_CONNECT)
1107                        cli->cl_max_pages_per_rpc = 1;
1108
1109                /* Reset ns_connect_flags only for initial connect. It might be
1110                 * changed in while using FS and if we reset it in reconnect
1111                 * this leads to losing user settings done before such as
1112                 * disable lru_resize, etc. */
1113                if (old_connect_flags != exp_connect_flags(exp) ||
1114                    aa->pcaa_initial_connect) {
1115                        CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server flags: %#llx\n",
1116                               imp->imp_obd->obd_name, ocd->ocd_connect_flags);
1117                        imp->imp_obd->obd_namespace->ns_connect_flags =
1118                                ocd->ocd_connect_flags;
1119                        imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1120                                ocd->ocd_connect_flags;
1121                }
1122
1123                if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1124                    (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1125                        /* We need a per-message support flag, because
1126                           a. we don't know if the incoming connect reply
1127                              supports AT or not (in reply_in_callback)
1128                              until we unpack it.
1129                           b. failovered server means export and flags are gone
1130                              (in ptlrpc_send_reply).
1131                           Can only be set when we know AT is supported at
1132                           both ends */
1133                        imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1134                else
1135                        imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1136
1137                if ((ocd->ocd_connect_flags & OBD_CONNECT_FULL20) &&
1138                    (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1139                        imp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
1140                else
1141                        imp->imp_msghdr_flags &= ~MSGHDR_CKSUM_INCOMPAT18;
1142
1143                LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1144                        (cli->cl_max_pages_per_rpc > 0));
1145        }
1146
1147out:
1148        imp->imp_connect_tried = 1;
1149
1150        if (rc != 0) {
1151                IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1152                if (rc == -EACCES) {
1153                        /*
1154                         * Give up trying to reconnect
1155                         * EACCES means client has no permission for connection
1156                         */
1157                        imp->imp_obd->obd_no_recov = 1;
1158                        ptlrpc_deactivate_import(imp);
1159                }
1160
1161                if (rc == -EPROTO) {
1162                        struct obd_connect_data *ocd;
1163
1164                        /* reply message might not be ready */
1165                        if (request->rq_repmsg == NULL)
1166                                return -EPROTO;
1167
1168                        ocd = req_capsule_server_get(&request->rq_pill,
1169                                                     &RMF_CONNECT_DATA);
1170                        if (ocd &&
1171                            (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1172                            (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1173                                /*
1174                                 * Actually servers are only supposed to refuse
1175                                 * connection from liblustre clients, so we
1176                                 * should never see this from VFS context
1177                                 */
1178                                LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1179                                        "(%d.%d.%d.%d)"
1180                                        " refused connection from this client "
1181                                        "with an incompatible version (%s).  "
1182                                        "Client must be recompiled\n",
1183                                        obd2cli_tgt(imp->imp_obd),
1184                                        OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1185                                        OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1186                                        OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1187                                        OBD_OCD_VERSION_FIX(ocd->ocd_version),
1188                                        LUSTRE_VERSION_STRING);
1189                                ptlrpc_deactivate_import(imp);
1190                                IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1191                        }
1192                        return -EPROTO;
1193                }
1194
1195                ptlrpc_maybe_ping_import_soon(imp);
1196
1197                CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1198                       obd2cli_tgt(imp->imp_obd),
1199                       (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1200        }
1201
1202        wake_up_all(&imp->imp_recovery_waitq);
1203        return rc;
1204}
1205
1206/**
1207 * interpret callback for "completed replay" RPCs.
1208 * \see signal_completed_replay
1209 */
1210static int completed_replay_interpret(const struct lu_env *env,
1211                                      struct ptlrpc_request *req,
1212                                      void * data, int rc)
1213{
1214        atomic_dec(&req->rq_import->imp_replay_inflight);
1215        if (req->rq_status == 0 &&
1216            !req->rq_import->imp_vbr_failed) {
1217                ptlrpc_import_recovery_state_machine(req->rq_import);
1218        } else {
1219                if (req->rq_import->imp_vbr_failed) {
1220                        CDEBUG(D_WARNING,
1221                               "%s: version recovery fails, reconnecting\n",
1222                               req->rq_import->imp_obd->obd_name);
1223                } else {
1224                        CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1225                                     "reconnecting\n",
1226                               req->rq_import->imp_obd->obd_name,
1227                               req->rq_status);
1228                }
1229                ptlrpc_connect_import(req->rq_import);
1230        }
1231
1232        return 0;
1233}
1234
1235/**
1236 * Let server know that we have no requests to replay anymore.
1237 * Achieved by just sending a PING request
1238 */
1239static int signal_completed_replay(struct obd_import *imp)
1240{
1241        struct ptlrpc_request *req;
1242
1243        if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_FINISH_REPLAY)))
1244                return 0;
1245
1246        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1247        atomic_inc(&imp->imp_replay_inflight);
1248
1249        req = ptlrpc_request_alloc_pack(imp, &RQF_OBD_PING, LUSTRE_OBD_VERSION,
1250                                        OBD_PING);
1251        if (req == NULL) {
1252                atomic_dec(&imp->imp_replay_inflight);
1253                return -ENOMEM;
1254        }
1255
1256        ptlrpc_request_set_replen(req);
1257        req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1258        lustre_msg_add_flags(req->rq_reqmsg,
1259                             MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE);
1260        if (AT_OFF)
1261                req->rq_timeout *= 3;
1262        req->rq_interpret_reply = completed_replay_interpret;
1263
1264        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
1265        return 0;
1266}
1267
1268/**
1269 * In kernel code all import invalidation happens in its own
1270 * separate thread, so that whatever application happened to encounter
1271 * a problem could still be killed or otherwise continue
1272 */
1273static int ptlrpc_invalidate_import_thread(void *data)
1274{
1275        struct obd_import *imp = data;
1276
1277        unshare_fs_struct();
1278
1279        CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1280               imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1281               imp->imp_connection->c_remote_uuid.uuid);
1282
1283        ptlrpc_invalidate_import(imp);
1284
1285        if (obd_dump_on_eviction) {
1286                CERROR("dump the log upon eviction\n");
1287                libcfs_debug_dumplog();
1288        }
1289
1290        IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1291        ptlrpc_import_recovery_state_machine(imp);
1292
1293        class_import_put(imp);
1294        return 0;
1295}
1296
1297/**
1298 * This is the state machine for client-side recovery on import.
1299 *
1300 * Typically we have two possibly paths. If we came to server and it is not
1301 * in recovery, we just enter IMP_EVICTED state, invalidate our import
1302 * state and reconnect from scratch.
1303 * If we came to server that is in recovery, we enter IMP_REPLAY import state.
1304 * We go through our list of requests to replay and send them to server one by
1305 * one.
1306 * After sending all request from the list we change import state to
1307 * IMP_REPLAY_LOCKS and re-request all the locks we believe we have from server
1308 * and also all the locks we don't yet have and wait for server to grant us.
1309 * After that we send a special "replay completed" request and change import
1310 * state to IMP_REPLAY_WAIT.
1311 * Upon receiving reply to that "replay completed" RPC we enter IMP_RECOVER
1312 * state and resend all requests from sending list.
1313 * After that we promote import to FULL state and send all delayed requests
1314 * and import is fully operational after that.
1315 *
1316 */
1317int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1318{
1319        int rc = 0;
1320        int inflight;
1321        char *target_start;
1322        int target_len;
1323
1324        if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1325                deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1326                          &target_start, &target_len);
1327                /* Don't care about MGC eviction */
1328                if (strcmp(imp->imp_obd->obd_type->typ_name,
1329                           LUSTRE_MGC_NAME) != 0) {
1330                        LCONSOLE_ERROR_MSG(0x167, "%s: This client was evicted "
1331                                           "by %.*s; in progress operations "
1332                                           "using this service will fail.\n",
1333                                           imp->imp_obd->obd_name, target_len,
1334                                           target_start);
1335                }
1336                CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1337                       obd2cli_tgt(imp->imp_obd),
1338                       imp->imp_connection->c_remote_uuid.uuid);
1339                /* reset vbr_failed flag upon eviction */
1340                spin_lock(&imp->imp_lock);
1341                imp->imp_vbr_failed = 0;
1342                spin_unlock(&imp->imp_lock);
1343
1344                {
1345                struct task_struct *task;
1346                /* bug 17802:  XXX client_disconnect_export vs connect request
1347                 * race. if client will evicted at this time, we start
1348                 * invalidate thread without reference to import and import can
1349                 * be freed at same time. */
1350                class_import_get(imp);
1351                task = kthread_run(ptlrpc_invalidate_import_thread, imp,
1352                                     "ll_imp_inval");
1353                if (IS_ERR(task)) {
1354                        class_import_put(imp);
1355                        CERROR("error starting invalidate thread: %d\n", rc);
1356                        rc = PTR_ERR(task);
1357                } else {
1358                        rc = 0;
1359                }
1360                return rc;
1361                }
1362        }
1363
1364        if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1365                CDEBUG(D_HA, "replay requested by %s\n",
1366                       obd2cli_tgt(imp->imp_obd));
1367                rc = ptlrpc_replay_next(imp, &inflight);
1368                if (inflight == 0 &&
1369                    atomic_read(&imp->imp_replay_inflight) == 0) {
1370                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1371                        rc = ldlm_replay_locks(imp);
1372                        if (rc)
1373                                GOTO(out, rc);
1374                }
1375                rc = 0;
1376        }
1377
1378        if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1379                if (atomic_read(&imp->imp_replay_inflight) == 0) {
1380                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1381                        rc = signal_completed_replay(imp);
1382                        if (rc)
1383                                GOTO(out, rc);
1384                }
1385
1386        }
1387
1388        if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1389                if (atomic_read(&imp->imp_replay_inflight) == 0) {
1390                        IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1391                }
1392        }
1393
1394        if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1395                CDEBUG(D_HA, "reconnected to %s@%s\n",
1396                       obd2cli_tgt(imp->imp_obd),
1397                       imp->imp_connection->c_remote_uuid.uuid);
1398
1399                rc = ptlrpc_resend(imp);
1400                if (rc)
1401                        GOTO(out, rc);
1402                IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1403                ptlrpc_activate_import(imp);
1404
1405                deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1406                          &target_start, &target_len);
1407                LCONSOLE_INFO("%s: Connection restored to %.*s (at %s)\n",
1408                              imp->imp_obd->obd_name,
1409                              target_len, target_start,
1410                              libcfs_nid2str(imp->imp_connection->c_peer.nid));
1411        }
1412
1413        if (imp->imp_state == LUSTRE_IMP_FULL) {
1414                wake_up_all(&imp->imp_recovery_waitq);
1415                ptlrpc_wake_delayed(imp);
1416        }
1417
1418out:
1419        return rc;
1420}
1421
1422int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1423{
1424        struct ptlrpc_request *req;
1425        int rq_opc, rc = 0;
1426
1427        if (imp->imp_obd->obd_force)
1428                GOTO(set_state, rc);
1429
1430        switch (imp->imp_connect_op) {
1431        case OST_CONNECT:
1432                rq_opc = OST_DISCONNECT;
1433                break;
1434        case MDS_CONNECT:
1435                rq_opc = MDS_DISCONNECT;
1436                break;
1437        case MGS_CONNECT:
1438                rq_opc = MGS_DISCONNECT;
1439                break;
1440        default:
1441                rc = -EINVAL;
1442                CERROR("%s: don't know how to disconnect from %s "
1443                       "(connect_op %d): rc = %d\n",
1444                       imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1445                       imp->imp_connect_op, rc);
1446                return rc;
1447        }
1448
1449        if (ptlrpc_import_in_recovery(imp)) {
1450                struct l_wait_info lwi;
1451                long timeout;
1452
1453                if (AT_OFF) {
1454                        if (imp->imp_server_timeout)
1455                                timeout = cfs_time_seconds(obd_timeout / 2);
1456                        else
1457                                timeout = cfs_time_seconds(obd_timeout);
1458                } else {
1459                        int idx = import_at_get_index(imp,
1460                                imp->imp_client->cli_request_portal);
1461                        timeout = cfs_time_seconds(
1462                                at_get(&imp->imp_at.iat_service_estimate[idx]));
1463                }
1464
1465                lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1466                                       back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1467                rc = l_wait_event(imp->imp_recovery_waitq,
1468                                  !ptlrpc_import_in_recovery(imp), &lwi);
1469
1470        }
1471
1472        spin_lock(&imp->imp_lock);
1473        if (imp->imp_state != LUSTRE_IMP_FULL)
1474                GOTO(out, 0);
1475        spin_unlock(&imp->imp_lock);
1476
1477        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_DISCONNECT,
1478                                        LUSTRE_OBD_VERSION, rq_opc);
1479        if (req) {
1480                /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1481                 * it fails.  We can get through the above with a down server
1482                 * if the client doesn't know the server is gone yet. */
1483                req->rq_no_resend = 1;
1484
1485                /* We want client umounts to happen quickly, no matter the
1486                   server state... */
1487                req->rq_timeout = min_t(int, req->rq_timeout,
1488                                        INITIAL_CONNECT_TIMEOUT);
1489
1490                IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1491                req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1492                ptlrpc_request_set_replen(req);
1493                rc = ptlrpc_queue_wait(req);
1494                ptlrpc_req_finished(req);
1495        }
1496
1497set_state:
1498        spin_lock(&imp->imp_lock);
1499out:
1500        if (noclose)
1501                IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1502        else
1503                IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1504        memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1505        spin_unlock(&imp->imp_lock);
1506
1507        if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ESHUTDOWN)
1508                rc = 0;
1509
1510        return rc;
1511}
1512EXPORT_SYMBOL(ptlrpc_disconnect_import);
1513
1514void ptlrpc_cleanup_imp(struct obd_import *imp)
1515{
1516        spin_lock(&imp->imp_lock);
1517        IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1518        imp->imp_generation++;
1519        spin_unlock(&imp->imp_lock);
1520        ptlrpc_abort_inflight(imp);
1521}
1522EXPORT_SYMBOL(ptlrpc_cleanup_imp);
1523
1524/* Adaptive Timeout utils */
1525extern unsigned int at_min, at_max, at_history;
1526
1527/* Bin into timeslices using AT_BINS bins.
1528   This gives us a max of the last binlimit*AT_BINS secs without the storage,
1529   but still smoothing out a return to normalcy from a slow response.
1530   (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1531int at_measured(struct adaptive_timeout *at, unsigned int val)
1532{
1533        unsigned int old = at->at_current;
1534        time_t now = get_seconds();
1535        time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1536
1537        LASSERT(at);
1538        CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1539               val, at, now - at->at_binstart, at->at_current,
1540               at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1541
1542        if (val == 0)
1543                /* 0's don't count, because we never want our timeout to
1544                   drop to 0, and because 0 could mean an error */
1545                return 0;
1546
1547        spin_lock(&at->at_lock);
1548
1549        if (unlikely(at->at_binstart == 0)) {
1550                /* Special case to remove default from history */
1551                at->at_current = val;
1552                at->at_worst_ever = val;
1553                at->at_worst_time = now;
1554                at->at_hist[0] = val;
1555                at->at_binstart = now;
1556        } else if (now - at->at_binstart < binlimit) {
1557                /* in bin 0 */
1558                at->at_hist[0] = max(val, at->at_hist[0]);
1559                at->at_current = max(val, at->at_current);
1560        } else {
1561                int i, shift;
1562                unsigned int maxv = val;
1563                /* move bins over */
1564                shift = (now - at->at_binstart) / binlimit;
1565                LASSERT(shift > 0);
1566                for (i = AT_BINS - 1; i >= 0; i--) {
1567                        if (i >= shift) {
1568                                at->at_hist[i] = at->at_hist[i - shift];
1569                                maxv = max(maxv, at->at_hist[i]);
1570                        } else {
1571                                at->at_hist[i] = 0;
1572                        }
1573                }
1574                at->at_hist[0] = val;
1575                at->at_current = maxv;
1576                at->at_binstart += shift * binlimit;
1577        }
1578
1579        if (at->at_current > at->at_worst_ever) {
1580                at->at_worst_ever = at->at_current;
1581                at->at_worst_time = now;
1582        }
1583
1584        if (at->at_flags & AT_FLG_NOHIST)
1585                /* Only keep last reported val; keeping the rest of the history
1586                   for proc only */
1587                at->at_current = val;
1588
1589        if (at_max > 0)
1590                at->at_current =  min(at->at_current, at_max);
1591        at->at_current =  max(at->at_current, at_min);
1592
1593        if (at->at_current != old)
1594                CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1595                       "(val=%u) hist %u %u %u %u\n", at,
1596                       old, at->at_current, at->at_current - old, val,
1597                       at->at_hist[0], at->at_hist[1], at->at_hist[2],
1598                       at->at_hist[3]);
1599
1600        /* if we changed, report the old value */
1601        old = (at->at_current != old) ? old : 0;
1602
1603        spin_unlock(&at->at_lock);
1604        return old;
1605}
1606
1607/* Find the imp_at index for a given portal; assign if space available */
1608int import_at_get_index(struct obd_import *imp, int portal)
1609{
1610        struct imp_at *at = &imp->imp_at;
1611        int i;
1612
1613        for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1614                if (at->iat_portal[i] == portal)
1615                        return i;
1616                if (at->iat_portal[i] == 0)
1617                        /* unused */
1618                        break;
1619        }
1620
1621        /* Not found in list, add it under a lock */
1622        spin_lock(&imp->imp_lock);
1623
1624        /* Check unused under lock */
1625        for (; i < IMP_AT_MAX_PORTALS; i++) {
1626                if (at->iat_portal[i] == portal)
1627                        goto out;
1628                if (at->iat_portal[i] == 0)
1629                        /* unused */
1630                        break;
1631        }
1632
1633        /* Not enough portals? */
1634        LASSERT(i < IMP_AT_MAX_PORTALS);
1635
1636        at->iat_portal[i] = portal;
1637out:
1638        spin_unlock(&imp->imp_lock);
1639        return i;
1640}
1641