linux/drivers/staging/lustre/lustre/ptlrpc/sec.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/ptlrpc/sec.c
  37 *
  38 * Author: Eric Mei <ericm@clusterfs.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_SEC
  42
  43#include <linux/libcfs/libcfs.h>
  44#include <linux/crypto.h>
  45#include <linux/key.h>
  46
  47#include <obd.h>
  48#include <obd_class.h>
  49#include <obd_support.h>
  50#include <lustre_net.h>
  51#include <lustre_import.h>
  52#include <lustre_dlm.h>
  53#include <lustre_sec.h>
  54
  55#include "ptlrpc_internal.h"
  56
  57/***********************************************
  58 * policy registers                         *
  59 ***********************************************/
  60
  61static rwlock_t policy_lock;
  62static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
  63        NULL,
  64};
  65
  66int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
  67{
  68        __u16 number = policy->sp_policy;
  69
  70        LASSERT(policy->sp_name);
  71        LASSERT(policy->sp_cops);
  72        LASSERT(policy->sp_sops);
  73
  74        if (number >= SPTLRPC_POLICY_MAX)
  75                return -EINVAL;
  76
  77        write_lock(&policy_lock);
  78        if (unlikely(policies[number])) {
  79                write_unlock(&policy_lock);
  80                return -EALREADY;
  81        }
  82        policies[number] = policy;
  83        write_unlock(&policy_lock);
  84
  85        CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
  86        return 0;
  87}
  88EXPORT_SYMBOL(sptlrpc_register_policy);
  89
  90int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
  91{
  92        __u16 number = policy->sp_policy;
  93
  94        LASSERT(number < SPTLRPC_POLICY_MAX);
  95
  96        write_lock(&policy_lock);
  97        if (unlikely(policies[number] == NULL)) {
  98                write_unlock(&policy_lock);
  99                CERROR("%s: already unregistered\n", policy->sp_name);
 100                return -EINVAL;
 101        }
 102
 103        LASSERT(policies[number] == policy);
 104        policies[number] = NULL;
 105        write_unlock(&policy_lock);
 106
 107        CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
 108        return 0;
 109}
 110EXPORT_SYMBOL(sptlrpc_unregister_policy);
 111
 112static
 113struct ptlrpc_sec_policy * sptlrpc_wireflavor2policy(__u32 flavor)
 114{
 115        static DEFINE_MUTEX(load_mutex);
 116        static atomic_t       loaded = ATOMIC_INIT(0);
 117        struct ptlrpc_sec_policy *policy;
 118        __u16                number = SPTLRPC_FLVR_POLICY(flavor);
 119        __u16                flag = 0;
 120
 121        if (number >= SPTLRPC_POLICY_MAX)
 122                return NULL;
 123
 124        while (1) {
 125                read_lock(&policy_lock);
 126                policy = policies[number];
 127                if (policy && !try_module_get(policy->sp_owner))
 128                        policy = NULL;
 129                if (policy == NULL)
 130                        flag = atomic_read(&loaded);
 131                read_unlock(&policy_lock);
 132
 133                if (policy != NULL || flag != 0 ||
 134                    number != SPTLRPC_POLICY_GSS)
 135                        break;
 136
 137                /* try to load gss module, once */
 138                mutex_lock(&load_mutex);
 139                if (atomic_read(&loaded) == 0) {
 140                        if (request_module("ptlrpc_gss") == 0)
 141                                CDEBUG(D_SEC,
 142                                       "module ptlrpc_gss loaded on demand\n");
 143                        else
 144                                CERROR("Unable to load module ptlrpc_gss\n");
 145
 146                        atomic_set(&loaded, 1);
 147                }
 148                mutex_unlock(&load_mutex);
 149        }
 150
 151        return policy;
 152}
 153
 154__u32 sptlrpc_name2flavor_base(const char *name)
 155{
 156        if (!strcmp(name, "null"))
 157                return SPTLRPC_FLVR_NULL;
 158        if (!strcmp(name, "plain"))
 159                return SPTLRPC_FLVR_PLAIN;
 160        if (!strcmp(name, "krb5n"))
 161                return SPTLRPC_FLVR_KRB5N;
 162        if (!strcmp(name, "krb5a"))
 163                return SPTLRPC_FLVR_KRB5A;
 164        if (!strcmp(name, "krb5i"))
 165                return SPTLRPC_FLVR_KRB5I;
 166        if (!strcmp(name, "krb5p"))
 167                return SPTLRPC_FLVR_KRB5P;
 168
 169        return SPTLRPC_FLVR_INVALID;
 170}
 171EXPORT_SYMBOL(sptlrpc_name2flavor_base);
 172
 173const char *sptlrpc_flavor2name_base(__u32 flvr)
 174{
 175        __u32   base = SPTLRPC_FLVR_BASE(flvr);
 176
 177        if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL))
 178                return "null";
 179        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN))
 180                return "plain";
 181        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5N))
 182                return "krb5n";
 183        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5A))
 184                return "krb5a";
 185        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5I))
 186                return "krb5i";
 187        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5P))
 188                return "krb5p";
 189
 190        CERROR("invalid wire flavor 0x%x\n", flvr);
 191        return "invalid";
 192}
 193EXPORT_SYMBOL(sptlrpc_flavor2name_base);
 194
 195char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
 196                               char *buf, int bufsize)
 197{
 198        if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN)
 199                snprintf(buf, bufsize, "hash:%s",
 200                         sptlrpc_get_hash_name(sf->u_bulk.hash.hash_alg));
 201        else
 202                snprintf(buf, bufsize, "%s",
 203                         sptlrpc_flavor2name_base(sf->sf_rpc));
 204
 205        buf[bufsize - 1] = '\0';
 206        return buf;
 207}
 208EXPORT_SYMBOL(sptlrpc_flavor2name_bulk);
 209
 210char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
 211{
 212        snprintf(buf, bufsize, "%s", sptlrpc_flavor2name_base(sf->sf_rpc));
 213
 214        /*
 215         * currently we don't support customized bulk specification for
 216         * flavors other than plain
 217         */
 218        if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN) {
 219                char bspec[16];
 220
 221                bspec[0] = '-';
 222                sptlrpc_flavor2name_bulk(sf, &bspec[1], sizeof(bspec) - 1);
 223                strncat(buf, bspec, bufsize);
 224        }
 225
 226        buf[bufsize - 1] = '\0';
 227        return buf;
 228}
 229EXPORT_SYMBOL(sptlrpc_flavor2name);
 230
 231char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
 232{
 233        buf[0] = '\0';
 234
 235        if (flags & PTLRPC_SEC_FL_REVERSE)
 236                strlcat(buf, "reverse,", bufsize);
 237        if (flags & PTLRPC_SEC_FL_ROOTONLY)
 238                strlcat(buf, "rootonly,", bufsize);
 239        if (flags & PTLRPC_SEC_FL_UDESC)
 240                strlcat(buf, "udesc,", bufsize);
 241        if (flags & PTLRPC_SEC_FL_BULK)
 242                strlcat(buf, "bulk,", bufsize);
 243        if (buf[0] == '\0')
 244                strlcat(buf, "-,", bufsize);
 245
 246        return buf;
 247}
 248EXPORT_SYMBOL(sptlrpc_secflags2str);
 249
 250/**************************************************
 251 * client context APIs                      *
 252 **************************************************/
 253
 254static
 255struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
 256{
 257        struct vfs_cred vcred;
 258        int create = 1, remove_dead = 1;
 259
 260        LASSERT(sec);
 261        LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
 262
 263        if (sec->ps_flvr.sf_flags & (PTLRPC_SEC_FL_REVERSE |
 264                                     PTLRPC_SEC_FL_ROOTONLY)) {
 265                vcred.vc_uid = 0;
 266                vcred.vc_gid = 0;
 267                if (sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_REVERSE) {
 268                        create = 0;
 269                        remove_dead = 0;
 270                }
 271        } else {
 272                vcred.vc_uid = from_kuid(&init_user_ns, current_uid());
 273                vcred.vc_gid = from_kgid(&init_user_ns, current_gid());
 274        }
 275
 276        return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
 277                                                   create, remove_dead);
 278}
 279
 280struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
 281{
 282        atomic_inc(&ctx->cc_refcount);
 283        return ctx;
 284}
 285EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
 286
 287void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
 288{
 289        struct ptlrpc_sec *sec = ctx->cc_sec;
 290
 291        LASSERT(sec);
 292        LASSERT_ATOMIC_POS(&ctx->cc_refcount);
 293
 294        if (!atomic_dec_and_test(&ctx->cc_refcount))
 295                return;
 296
 297        sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
 298}
 299EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
 300
 301/**
 302 * Expire the client context immediately.
 303 *
 304 * \pre Caller must hold at least 1 reference on the \a ctx.
 305 */
 306void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
 307{
 308        LASSERT(ctx->cc_ops->die);
 309        ctx->cc_ops->die(ctx, 0);
 310}
 311EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
 312
 313/**
 314 * To wake up the threads who are waiting for this client context. Called
 315 * after some status change happened on \a ctx.
 316 */
 317void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
 318{
 319        struct ptlrpc_request *req, *next;
 320
 321        spin_lock(&ctx->cc_lock);
 322        list_for_each_entry_safe(req, next, &ctx->cc_req_list,
 323                                     rq_ctx_chain) {
 324                list_del_init(&req->rq_ctx_chain);
 325                ptlrpc_client_wake_req(req);
 326        }
 327        spin_unlock(&ctx->cc_lock);
 328}
 329EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
 330
 331int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
 332{
 333        LASSERT(ctx->cc_ops);
 334
 335        if (ctx->cc_ops->display == NULL)
 336                return 0;
 337
 338        return ctx->cc_ops->display(ctx, buf, bufsize);
 339}
 340
 341static int import_sec_check_expire(struct obd_import *imp)
 342{
 343        int     adapt = 0;
 344
 345        spin_lock(&imp->imp_lock);
 346        if (imp->imp_sec_expire &&
 347            imp->imp_sec_expire < cfs_time_current_sec()) {
 348                adapt = 1;
 349                imp->imp_sec_expire = 0;
 350        }
 351        spin_unlock(&imp->imp_lock);
 352
 353        if (!adapt)
 354                return 0;
 355
 356        CDEBUG(D_SEC, "found delayed sec adapt expired, do it now\n");
 357        return sptlrpc_import_sec_adapt(imp, NULL, 0);
 358}
 359
 360static int import_sec_validate_get(struct obd_import *imp,
 361                                   struct ptlrpc_sec **sec)
 362{
 363        int     rc;
 364
 365        if (unlikely(imp->imp_sec_expire)) {
 366                rc = import_sec_check_expire(imp);
 367                if (rc)
 368                        return rc;
 369        }
 370
 371        *sec = sptlrpc_import_sec_ref(imp);
 372        if (*sec == NULL) {
 373                CERROR("import %p (%s) with no sec\n",
 374                       imp, ptlrpc_import_state_name(imp->imp_state));
 375                return -EACCES;
 376        }
 377
 378        if (unlikely((*sec)->ps_dying)) {
 379                CERROR("attempt to use dying sec %p\n", sec);
 380                sptlrpc_sec_put(*sec);
 381                return -EACCES;
 382        }
 383
 384        return 0;
 385}
 386
 387/**
 388 * Given a \a req, find or allocate a appropriate context for it.
 389 * \pre req->rq_cli_ctx == NULL.
 390 *
 391 * \retval 0 succeed, and req->rq_cli_ctx is set.
 392 * \retval -ev error number, and req->rq_cli_ctx == NULL.
 393 */
 394int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
 395{
 396        struct obd_import *imp = req->rq_import;
 397        struct ptlrpc_sec *sec;
 398        int             rc;
 399
 400        LASSERT(!req->rq_cli_ctx);
 401        LASSERT(imp);
 402
 403        rc = import_sec_validate_get(imp, &sec);
 404        if (rc)
 405                return rc;
 406
 407        req->rq_cli_ctx = get_my_ctx(sec);
 408
 409        sptlrpc_sec_put(sec);
 410
 411        if (!req->rq_cli_ctx) {
 412                CERROR("req %p: fail to get context\n", req);
 413                return -ENOMEM;
 414        }
 415
 416        return 0;
 417}
 418
 419/**
 420 * Drop the context for \a req.
 421 * \pre req->rq_cli_ctx != NULL.
 422 * \post req->rq_cli_ctx == NULL.
 423 *
 424 * If \a sync == 0, this function should return quickly without sleep;
 425 * otherwise it might trigger and wait for the whole process of sending
 426 * an context-destroying rpc to server.
 427 */
 428void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
 429{
 430        LASSERT(req);
 431        LASSERT(req->rq_cli_ctx);
 432
 433        /* request might be asked to release earlier while still
 434         * in the context waiting list.
 435         */
 436        if (!list_empty(&req->rq_ctx_chain)) {
 437                spin_lock(&req->rq_cli_ctx->cc_lock);
 438                list_del_init(&req->rq_ctx_chain);
 439                spin_unlock(&req->rq_cli_ctx->cc_lock);
 440        }
 441
 442        sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
 443        req->rq_cli_ctx = NULL;
 444}
 445
 446static
 447int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
 448                           struct ptlrpc_cli_ctx *oldctx,
 449                           struct ptlrpc_cli_ctx *newctx)
 450{
 451        struct sptlrpc_flavor   old_flvr;
 452        char               *reqmsg = NULL; /* to workaround old gcc */
 453        int                  reqmsg_size;
 454        int                  rc = 0;
 455
 456        LASSERT(req->rq_reqmsg);
 457        LASSERT(req->rq_reqlen);
 458        LASSERT(req->rq_replen);
 459
 460        CDEBUG(D_SEC, "req %p: switch ctx %p(%u->%s) -> %p(%u->%s), "
 461               "switch sec %p(%s) -> %p(%s)\n", req,
 462               oldctx, oldctx->cc_vcred.vc_uid, sec2target_str(oldctx->cc_sec),
 463               newctx, newctx->cc_vcred.vc_uid, sec2target_str(newctx->cc_sec),
 464               oldctx->cc_sec, oldctx->cc_sec->ps_policy->sp_name,
 465               newctx->cc_sec, newctx->cc_sec->ps_policy->sp_name);
 466
 467        /* save flavor */
 468        old_flvr = req->rq_flvr;
 469
 470        /* save request message */
 471        reqmsg_size = req->rq_reqlen;
 472        if (reqmsg_size != 0) {
 473                OBD_ALLOC_LARGE(reqmsg, reqmsg_size);
 474                if (reqmsg == NULL)
 475                        return -ENOMEM;
 476                memcpy(reqmsg, req->rq_reqmsg, reqmsg_size);
 477        }
 478
 479        /* release old req/rep buf */
 480        req->rq_cli_ctx = oldctx;
 481        sptlrpc_cli_free_reqbuf(req);
 482        sptlrpc_cli_free_repbuf(req);
 483        req->rq_cli_ctx = newctx;
 484
 485        /* recalculate the flavor */
 486        sptlrpc_req_set_flavor(req, 0);
 487
 488        /* alloc new request buffer
 489         * we don't need to alloc reply buffer here, leave it to the
 490         * rest procedure of ptlrpc */
 491        if (reqmsg_size != 0) {
 492                rc = sptlrpc_cli_alloc_reqbuf(req, reqmsg_size);
 493                if (!rc) {
 494                        LASSERT(req->rq_reqmsg);
 495                        memcpy(req->rq_reqmsg, reqmsg, reqmsg_size);
 496                } else {
 497                        CWARN("failed to alloc reqbuf: %d\n", rc);
 498                        req->rq_flvr = old_flvr;
 499                }
 500
 501                OBD_FREE_LARGE(reqmsg, reqmsg_size);
 502        }
 503        return rc;
 504}
 505
 506/**
 507 * If current context of \a req is dead somehow, e.g. we just switched flavor
 508 * thus marked original contexts dead, we'll find a new context for it. if
 509 * no switch is needed, \a req will end up with the same context.
 510 *
 511 * \note a request must have a context, to keep other parts of code happy.
 512 * In any case of failure during the switching, we must restore the old one.
 513 */
 514int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
 515{
 516        struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
 517        struct ptlrpc_cli_ctx *newctx;
 518        int                 rc;
 519
 520        LASSERT(oldctx);
 521
 522        sptlrpc_cli_ctx_get(oldctx);
 523        sptlrpc_req_put_ctx(req, 0);
 524
 525        rc = sptlrpc_req_get_ctx(req);
 526        if (unlikely(rc)) {
 527                LASSERT(!req->rq_cli_ctx);
 528
 529                /* restore old ctx */
 530                req->rq_cli_ctx = oldctx;
 531                return rc;
 532        }
 533
 534        newctx = req->rq_cli_ctx;
 535        LASSERT(newctx);
 536
 537        if (unlikely(newctx == oldctx &&
 538                     test_bit(PTLRPC_CTX_DEAD_BIT, &oldctx->cc_flags))) {
 539                /*
 540                 * still get the old dead ctx, usually means system too busy
 541                 */
 542                CDEBUG(D_SEC,
 543                       "ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
 544                       newctx, newctx->cc_flags);
 545
 546                schedule_timeout_and_set_state(TASK_INTERRUPTIBLE,
 547                                                   HZ);
 548        } else {
 549                /*
 550                 * it's possible newctx == oldctx if we're switching
 551                 * subflavor with the same sec.
 552                 */
 553                rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
 554                if (rc) {
 555                        /* restore old ctx */
 556                        sptlrpc_req_put_ctx(req, 0);
 557                        req->rq_cli_ctx = oldctx;
 558                        return rc;
 559                }
 560
 561                LASSERT(req->rq_cli_ctx == newctx);
 562        }
 563
 564        sptlrpc_cli_ctx_put(oldctx, 1);
 565        return 0;
 566}
 567EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
 568
 569static
 570int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
 571{
 572        if (cli_ctx_is_refreshed(ctx))
 573                return 1;
 574        return 0;
 575}
 576
 577static
 578int ctx_refresh_timeout(void *data)
 579{
 580        struct ptlrpc_request *req = data;
 581        int rc;
 582
 583        /* conn_cnt is needed in expire_one_request */
 584        lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
 585
 586        rc = ptlrpc_expire_one_request(req, 1);
 587        /* if we started recovery, we should mark this ctx dead; otherwise
 588         * in case of lgssd died nobody would retire this ctx, following
 589         * connecting will still find the same ctx thus cause deadlock.
 590         * there's an assumption that expire time of the request should be
 591         * later than the context refresh expire time.
 592         */
 593        if (rc == 0)
 594                req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
 595        return rc;
 596}
 597
 598static
 599void ctx_refresh_interrupt(void *data)
 600{
 601        struct ptlrpc_request *req = data;
 602
 603        spin_lock(&req->rq_lock);
 604        req->rq_intr = 1;
 605        spin_unlock(&req->rq_lock);
 606}
 607
 608static
 609void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
 610{
 611        spin_lock(&ctx->cc_lock);
 612        if (!list_empty(&req->rq_ctx_chain))
 613                list_del_init(&req->rq_ctx_chain);
 614        spin_unlock(&ctx->cc_lock);
 615}
 616
 617/**
 618 * To refresh the context of \req, if it's not up-to-date.
 619 * \param timeout
 620 * - < 0: don't wait
 621 * - = 0: wait until success or fatal error occur
 622 * - > 0: timeout value (in seconds)
 623 *
 624 * The status of the context could be subject to be changed by other threads
 625 * at any time. We allow this race, but once we return with 0, the caller will
 626 * suppose it's uptodated and keep using it until the owning rpc is done.
 627 *
 628 * \retval 0 only if the context is uptodated.
 629 * \retval -ev error number.
 630 */
 631int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
 632{
 633        struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
 634        struct ptlrpc_sec      *sec;
 635        struct l_wait_info      lwi;
 636        int                  rc;
 637
 638        LASSERT(ctx);
 639
 640        if (req->rq_ctx_init || req->rq_ctx_fini)
 641                return 0;
 642
 643        /*
 644         * during the process a request's context might change type even
 645         * (e.g. from gss ctx to null ctx), so each loop we need to re-check
 646         * everything
 647         */
 648again:
 649        rc = import_sec_validate_get(req->rq_import, &sec);
 650        if (rc)
 651                return rc;
 652
 653        if (sec->ps_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
 654                CDEBUG(D_SEC, "req %p: flavor has changed %x -> %x\n",
 655                      req, req->rq_flvr.sf_rpc, sec->ps_flvr.sf_rpc);
 656                req_off_ctx_list(req, ctx);
 657                sptlrpc_req_replace_dead_ctx(req);
 658                ctx = req->rq_cli_ctx;
 659        }
 660        sptlrpc_sec_put(sec);
 661
 662        if (cli_ctx_is_eternal(ctx))
 663                return 0;
 664
 665        if (unlikely(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags))) {
 666                LASSERT(ctx->cc_ops->refresh);
 667                ctx->cc_ops->refresh(ctx);
 668        }
 669        LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
 670
 671        LASSERT(ctx->cc_ops->validate);
 672        if (ctx->cc_ops->validate(ctx) == 0) {
 673                req_off_ctx_list(req, ctx);
 674                return 0;
 675        }
 676
 677        if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
 678                spin_lock(&req->rq_lock);
 679                req->rq_err = 1;
 680                spin_unlock(&req->rq_lock);
 681                req_off_ctx_list(req, ctx);
 682                return -EPERM;
 683        }
 684
 685        /*
 686         * There's a subtle issue for resending RPCs, suppose following
 687         * situation:
 688         *  1. the request was sent to server.
 689         *  2. recovery was kicked start, after finished the request was
 690         *     marked as resent.
 691         *  3. resend the request.
 692         *  4. old reply from server received, we accept and verify the reply.
 693         *     this has to be success, otherwise the error will be aware
 694         *     by application.
 695         *  5. new reply from server received, dropped by LNet.
 696         *
 697         * Note the xid of old & new request is the same. We can't simply
 698         * change xid for the resent request because the server replies on
 699         * it for reply reconstruction.
 700         *
 701         * Commonly the original context should be uptodate because we
 702         * have a expiry nice time; server will keep its context because
 703         * we at least hold a ref of old context which prevent context
 704         * destroying RPC being sent. So server still can accept the request
 705         * and finish the RPC. But if that's not the case:
 706         *  1. If server side context has been trimmed, a NO_CONTEXT will
 707         *     be returned, gss_cli_ctx_verify/unseal will switch to new
 708         *     context by force.
 709         *  2. Current context never be refreshed, then we are fine: we
 710         *     never really send request with old context before.
 711         */
 712        if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
 713            unlikely(req->rq_reqmsg) &&
 714            lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
 715                req_off_ctx_list(req, ctx);
 716                return 0;
 717        }
 718
 719        if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
 720                req_off_ctx_list(req, ctx);
 721                /*
 722                 * don't switch ctx if import was deactivated
 723                 */
 724                if (req->rq_import->imp_deactive) {
 725                        spin_lock(&req->rq_lock);
 726                        req->rq_err = 1;
 727                        spin_unlock(&req->rq_lock);
 728                        return -EINTR;
 729                }
 730
 731                rc = sptlrpc_req_replace_dead_ctx(req);
 732                if (rc) {
 733                        LASSERT(ctx == req->rq_cli_ctx);
 734                        CERROR("req %p: failed to replace dead ctx %p: %d\n",
 735                               req, ctx, rc);
 736                        spin_lock(&req->rq_lock);
 737                        req->rq_err = 1;
 738                        spin_unlock(&req->rq_lock);
 739                        return rc;
 740                }
 741
 742                ctx = req->rq_cli_ctx;
 743                goto again;
 744        }
 745
 746        /*
 747         * Now we're sure this context is during upcall, add myself into
 748         * waiting list
 749         */
 750        spin_lock(&ctx->cc_lock);
 751        if (list_empty(&req->rq_ctx_chain))
 752                list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
 753        spin_unlock(&ctx->cc_lock);
 754
 755        if (timeout < 0)
 756                return -EWOULDBLOCK;
 757
 758        /* Clear any flags that may be present from previous sends */
 759        LASSERT(req->rq_receiving_reply == 0);
 760        spin_lock(&req->rq_lock);
 761        req->rq_err = 0;
 762        req->rq_timedout = 0;
 763        req->rq_resend = 0;
 764        req->rq_restart = 0;
 765        spin_unlock(&req->rq_lock);
 766
 767        lwi = LWI_TIMEOUT_INTR(timeout * HZ, ctx_refresh_timeout,
 768                               ctx_refresh_interrupt, req);
 769        rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
 770
 771        /*
 772         * following cases could lead us here:
 773         * - successfully refreshed;
 774         * - interrupted;
 775         * - timedout, and we don't want recover from the failure;
 776         * - timedout, and waked up upon recovery finished;
 777         * - someone else mark this ctx dead by force;
 778         * - someone invalidate the req and call ptlrpc_client_wake_req(),
 779         *   e.g. ptlrpc_abort_inflight();
 780         */
 781        if (!cli_ctx_is_refreshed(ctx)) {
 782                /* timed out or interruptted */
 783                req_off_ctx_list(req, ctx);
 784
 785                LASSERT(rc != 0);
 786                return rc;
 787        }
 788
 789        goto again;
 790}
 791
 792/**
 793 * Initialize flavor settings for \a req, according to \a opcode.
 794 *
 795 * \note this could be called in two situations:
 796 * - new request from ptlrpc_pre_req(), with proper @opcode
 797 * - old request which changed ctx in the middle, with @opcode == 0
 798 */
 799void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
 800{
 801        struct ptlrpc_sec *sec;
 802
 803        LASSERT(req->rq_import);
 804        LASSERT(req->rq_cli_ctx);
 805        LASSERT(req->rq_cli_ctx->cc_sec);
 806        LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
 807
 808        /* special security flags accoding to opcode */
 809        switch (opcode) {
 810        case OST_READ:
 811        case MDS_READPAGE:
 812        case MGS_CONFIG_READ:
 813        case OBD_IDX_READ:
 814                req->rq_bulk_read = 1;
 815                break;
 816        case OST_WRITE:
 817        case MDS_WRITEPAGE:
 818                req->rq_bulk_write = 1;
 819                break;
 820        case SEC_CTX_INIT:
 821                req->rq_ctx_init = 1;
 822                break;
 823        case SEC_CTX_FINI:
 824                req->rq_ctx_fini = 1;
 825                break;
 826        case 0:
 827                /* init/fini rpc won't be resend, so can't be here */
 828                LASSERT(req->rq_ctx_init == 0);
 829                LASSERT(req->rq_ctx_fini == 0);
 830
 831                /* cleanup flags, which should be recalculated */
 832                req->rq_pack_udesc = 0;
 833                req->rq_pack_bulk = 0;
 834                break;
 835        }
 836
 837        sec = req->rq_cli_ctx->cc_sec;
 838
 839        spin_lock(&sec->ps_lock);
 840        req->rq_flvr = sec->ps_flvr;
 841        spin_unlock(&sec->ps_lock);
 842
 843        /* force SVC_NULL for context initiation rpc, SVC_INTG for context
 844         * destruction rpc */
 845        if (unlikely(req->rq_ctx_init))
 846                flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
 847        else if (unlikely(req->rq_ctx_fini))
 848                flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
 849
 850        /* user descriptor flag, null security can't do it anyway */
 851        if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
 852            (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL))
 853                req->rq_pack_udesc = 1;
 854
 855        /* bulk security flag */
 856        if ((req->rq_bulk_read || req->rq_bulk_write) &&
 857            sptlrpc_flavor_has_bulk(&req->rq_flvr))
 858                req->rq_pack_bulk = 1;
 859}
 860
 861void sptlrpc_request_out_callback(struct ptlrpc_request *req)
 862{
 863        if (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
 864                return;
 865
 866        LASSERT(req->rq_clrbuf);
 867        if (req->rq_pool || !req->rq_reqbuf)
 868                return;
 869
 870        OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
 871        req->rq_reqbuf = NULL;
 872        req->rq_reqbuf_len = 0;
 873}
 874
 875/**
 876 * Given an import \a imp, check whether current user has a valid context
 877 * or not. We may create a new context and try to refresh it, and try
 878 * repeatedly try in case of non-fatal errors. Return 0 means success.
 879 */
 880int sptlrpc_import_check_ctx(struct obd_import *imp)
 881{
 882        struct ptlrpc_sec     *sec;
 883        struct ptlrpc_cli_ctx *ctx;
 884        struct ptlrpc_request *req = NULL;
 885        int rc;
 886
 887        might_sleep();
 888
 889        sec = sptlrpc_import_sec_ref(imp);
 890        ctx = get_my_ctx(sec);
 891        sptlrpc_sec_put(sec);
 892
 893        if (!ctx)
 894                return -ENOMEM;
 895
 896        if (cli_ctx_is_eternal(ctx) ||
 897            ctx->cc_ops->validate(ctx) == 0) {
 898                sptlrpc_cli_ctx_put(ctx, 1);
 899                return 0;
 900        }
 901
 902        if (cli_ctx_is_error(ctx)) {
 903                sptlrpc_cli_ctx_put(ctx, 1);
 904                return -EACCES;
 905        }
 906
 907        OBD_ALLOC_PTR(req);
 908        if (!req)
 909                return -ENOMEM;
 910
 911        spin_lock_init(&req->rq_lock);
 912        atomic_set(&req->rq_refcount, 10000);
 913        INIT_LIST_HEAD(&req->rq_ctx_chain);
 914        init_waitqueue_head(&req->rq_reply_waitq);
 915        init_waitqueue_head(&req->rq_set_waitq);
 916        req->rq_import = imp;
 917        req->rq_flvr = sec->ps_flvr;
 918        req->rq_cli_ctx = ctx;
 919
 920        rc = sptlrpc_req_refresh_ctx(req, 0);
 921        LASSERT(list_empty(&req->rq_ctx_chain));
 922        sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
 923        OBD_FREE_PTR(req);
 924
 925        return rc;
 926}
 927
 928/**
 929 * Used by ptlrpc client, to perform the pre-defined security transformation
 930 * upon the request message of \a req. After this function called,
 931 * req->rq_reqmsg is still accessible as clear text.
 932 */
 933int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
 934{
 935        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
 936        int rc = 0;
 937
 938        LASSERT(ctx);
 939        LASSERT(ctx->cc_sec);
 940        LASSERT(req->rq_reqbuf || req->rq_clrbuf);
 941
 942        /* we wrap bulk request here because now we can be sure
 943         * the context is uptodate.
 944         */
 945        if (req->rq_bulk) {
 946                rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
 947                if (rc)
 948                        return rc;
 949        }
 950
 951        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
 952        case SPTLRPC_SVC_NULL:
 953        case SPTLRPC_SVC_AUTH:
 954        case SPTLRPC_SVC_INTG:
 955                LASSERT(ctx->cc_ops->sign);
 956                rc = ctx->cc_ops->sign(ctx, req);
 957                break;
 958        case SPTLRPC_SVC_PRIV:
 959                LASSERT(ctx->cc_ops->seal);
 960                rc = ctx->cc_ops->seal(ctx, req);
 961                break;
 962        default:
 963                LBUG();
 964        }
 965
 966        if (rc == 0) {
 967                LASSERT(req->rq_reqdata_len);
 968                LASSERT(req->rq_reqdata_len % 8 == 0);
 969                LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
 970        }
 971
 972        return rc;
 973}
 974
 975static int do_cli_unwrap_reply(struct ptlrpc_request *req)
 976{
 977        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
 978        int                 rc;
 979
 980        LASSERT(ctx);
 981        LASSERT(ctx->cc_sec);
 982        LASSERT(req->rq_repbuf);
 983        LASSERT(req->rq_repdata);
 984        LASSERT(req->rq_repmsg == NULL);
 985
 986        req->rq_rep_swab_mask = 0;
 987
 988        rc = __lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len);
 989        switch (rc) {
 990        case 1:
 991                lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
 992        case 0:
 993                break;
 994        default:
 995                CERROR("failed unpack reply: x"LPU64"\n", req->rq_xid);
 996                return -EPROTO;
 997        }
 998
 999        if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
1000                CERROR("replied data length %d too small\n",
1001                       req->rq_repdata_len);
1002                return -EPROTO;
1003        }
1004
1005        if (SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr) !=
1006            SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
1007                CERROR("reply policy %u doesn't match request policy %u\n",
1008                       SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr),
1009                       SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc));
1010                return -EPROTO;
1011        }
1012
1013        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
1014        case SPTLRPC_SVC_NULL:
1015        case SPTLRPC_SVC_AUTH:
1016        case SPTLRPC_SVC_INTG:
1017                LASSERT(ctx->cc_ops->verify);
1018                rc = ctx->cc_ops->verify(ctx, req);
1019                break;
1020        case SPTLRPC_SVC_PRIV:
1021                LASSERT(ctx->cc_ops->unseal);
1022                rc = ctx->cc_ops->unseal(ctx, req);
1023                break;
1024        default:
1025                LBUG();
1026        }
1027        LASSERT(rc || req->rq_repmsg || req->rq_resend);
1028
1029        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL &&
1030            !req->rq_ctx_init)
1031                req->rq_rep_swab_mask = 0;
1032        return rc;
1033}
1034
1035/**
1036 * Used by ptlrpc client, to perform security transformation upon the reply
1037 * message of \a req. After return successfully, req->rq_repmsg points to
1038 * the reply message in clear text.
1039 *
1040 * \pre the reply buffer should have been un-posted from LNet, so nothing is
1041 * going to change.
1042 */
1043int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
1044{
1045        LASSERT(req->rq_repbuf);
1046        LASSERT(req->rq_repdata == NULL);
1047        LASSERT(req->rq_repmsg == NULL);
1048        LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
1049
1050        if (req->rq_reply_off == 0 &&
1051            (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1052                CERROR("real reply with offset 0\n");
1053                return -EPROTO;
1054        }
1055
1056        if (req->rq_reply_off % 8 != 0) {
1057                CERROR("reply at odd offset %u\n", req->rq_reply_off);
1058                return -EPROTO;
1059        }
1060
1061        req->rq_repdata = (struct lustre_msg *)
1062                                (req->rq_repbuf + req->rq_reply_off);
1063        req->rq_repdata_len = req->rq_nob_received;
1064
1065        return do_cli_unwrap_reply(req);
1066}
1067
1068/**
1069 * Used by ptlrpc client, to perform security transformation upon the early
1070 * reply message of \a req. We expect the rq_reply_off is 0, and
1071 * rq_nob_received is the early reply size.
1072 *
1073 * Because the receive buffer might be still posted, the reply data might be
1074 * changed at any time, no matter we're holding rq_lock or not. For this reason
1075 * we allocate a separate ptlrpc_request and reply buffer for early reply
1076 * processing.
1077 *
1078 * \retval 0 success, \a req_ret is filled with a duplicated ptlrpc_request.
1079 * Later the caller must call sptlrpc_cli_finish_early_reply() on the returned
1080 * \a *req_ret to release it.
1081 * \retval -ev error number, and \a req_ret will not be set.
1082 */
1083int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
1084                                   struct ptlrpc_request **req_ret)
1085{
1086        struct ptlrpc_request  *early_req;
1087        char               *early_buf;
1088        int                  early_bufsz, early_size;
1089        int                  rc;
1090
1091        OBD_ALLOC_PTR(early_req);
1092        if (early_req == NULL)
1093                return -ENOMEM;
1094
1095        early_size = req->rq_nob_received;
1096        early_bufsz = size_roundup_power2(early_size);
1097        OBD_ALLOC_LARGE(early_buf, early_bufsz);
1098        if (early_buf == NULL)
1099                GOTO(err_req, rc = -ENOMEM);
1100
1101        /* sanity checkings and copy data out, do it inside spinlock */
1102        spin_lock(&req->rq_lock);
1103
1104        if (req->rq_replied) {
1105                spin_unlock(&req->rq_lock);
1106                GOTO(err_buf, rc = -EALREADY);
1107        }
1108
1109        LASSERT(req->rq_repbuf);
1110        LASSERT(req->rq_repdata == NULL);
1111        LASSERT(req->rq_repmsg == NULL);
1112
1113        if (req->rq_reply_off != 0) {
1114                CERROR("early reply with offset %u\n", req->rq_reply_off);
1115                spin_unlock(&req->rq_lock);
1116                GOTO(err_buf, rc = -EPROTO);
1117        }
1118
1119        if (req->rq_nob_received != early_size) {
1120                /* even another early arrived the size should be the same */
1121                CERROR("data size has changed from %u to %u\n",
1122                       early_size, req->rq_nob_received);
1123                spin_unlock(&req->rq_lock);
1124                GOTO(err_buf, rc = -EINVAL);
1125        }
1126
1127        if (req->rq_nob_received < sizeof(struct lustre_msg)) {
1128                CERROR("early reply length %d too small\n",
1129                       req->rq_nob_received);
1130                spin_unlock(&req->rq_lock);
1131                GOTO(err_buf, rc = -EALREADY);
1132        }
1133
1134        memcpy(early_buf, req->rq_repbuf, early_size);
1135        spin_unlock(&req->rq_lock);
1136
1137        spin_lock_init(&early_req->rq_lock);
1138        early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
1139        early_req->rq_flvr = req->rq_flvr;
1140        early_req->rq_repbuf = early_buf;
1141        early_req->rq_repbuf_len = early_bufsz;
1142        early_req->rq_repdata = (struct lustre_msg *) early_buf;
1143        early_req->rq_repdata_len = early_size;
1144        early_req->rq_early = 1;
1145        early_req->rq_reqmsg = req->rq_reqmsg;
1146
1147        rc = do_cli_unwrap_reply(early_req);
1148        if (rc) {
1149                DEBUG_REQ(D_ADAPTTO, early_req,
1150                          "error %d unwrap early reply", rc);
1151                GOTO(err_ctx, rc);
1152        }
1153
1154        LASSERT(early_req->rq_repmsg);
1155        *req_ret = early_req;
1156        return 0;
1157
1158err_ctx:
1159        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1160err_buf:
1161        OBD_FREE_LARGE(early_buf, early_bufsz);
1162err_req:
1163        OBD_FREE_PTR(early_req);
1164        return rc;
1165}
1166
1167/**
1168 * Used by ptlrpc client, to release a processed early reply \a early_req.
1169 *
1170 * \pre \a early_req was obtained from calling sptlrpc_cli_unwrap_early_reply().
1171 */
1172void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
1173{
1174        LASSERT(early_req->rq_repbuf);
1175        LASSERT(early_req->rq_repdata);
1176        LASSERT(early_req->rq_repmsg);
1177
1178        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1179        OBD_FREE_LARGE(early_req->rq_repbuf, early_req->rq_repbuf_len);
1180        OBD_FREE_PTR(early_req);
1181}
1182
1183/**************************************************
1184 * sec ID                                        *
1185 **************************************************/
1186
1187/*
1188 * "fixed" sec (e.g. null) use sec_id < 0
1189 */
1190static atomic_t sptlrpc_sec_id = ATOMIC_INIT(1);
1191
1192int sptlrpc_get_next_secid(void)
1193{
1194        return atomic_inc_return(&sptlrpc_sec_id);
1195}
1196EXPORT_SYMBOL(sptlrpc_get_next_secid);
1197
1198/**************************************************
1199 * client side high-level security APIs    *
1200 **************************************************/
1201
1202static int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
1203                                   int grace, int force)
1204{
1205        struct ptlrpc_sec_policy *policy = sec->ps_policy;
1206
1207        LASSERT(policy->sp_cops);
1208        LASSERT(policy->sp_cops->flush_ctx_cache);
1209
1210        return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
1211}
1212
1213static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
1214{
1215        struct ptlrpc_sec_policy *policy = sec->ps_policy;
1216
1217        LASSERT_ATOMIC_ZERO(&sec->ps_refcount);
1218        LASSERT_ATOMIC_ZERO(&sec->ps_nctx);
1219        LASSERT(policy->sp_cops->destroy_sec);
1220
1221        CDEBUG(D_SEC, "%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
1222
1223        policy->sp_cops->destroy_sec(sec);
1224        sptlrpc_policy_put(policy);
1225}
1226
1227void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
1228{
1229        sec_cop_destroy_sec(sec);
1230}
1231EXPORT_SYMBOL(sptlrpc_sec_destroy);
1232
1233static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
1234{
1235        LASSERT_ATOMIC_POS(&sec->ps_refcount);
1236
1237        if (sec->ps_policy->sp_cops->kill_sec) {
1238                sec->ps_policy->sp_cops->kill_sec(sec);
1239
1240                sec_cop_flush_ctx_cache(sec, -1, 1, 1);
1241        }
1242}
1243
1244struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
1245{
1246        if (sec)
1247                atomic_inc(&sec->ps_refcount);
1248
1249        return sec;
1250}
1251EXPORT_SYMBOL(sptlrpc_sec_get);
1252
1253void sptlrpc_sec_put(struct ptlrpc_sec *sec)
1254{
1255        if (sec) {
1256                LASSERT_ATOMIC_POS(&sec->ps_refcount);
1257
1258                if (atomic_dec_and_test(&sec->ps_refcount)) {
1259                        sptlrpc_gc_del_sec(sec);
1260                        sec_cop_destroy_sec(sec);
1261                }
1262        }
1263}
1264EXPORT_SYMBOL(sptlrpc_sec_put);
1265
1266/*
1267 * policy module is responsible for taking refrence of import
1268 */
1269static
1270struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
1271                                       struct ptlrpc_svc_ctx *svc_ctx,
1272                                       struct sptlrpc_flavor *sf,
1273                                       enum lustre_sec_part sp)
1274{
1275        struct ptlrpc_sec_policy *policy;
1276        struct ptlrpc_sec       *sec;
1277        char                  str[32];
1278
1279        if (svc_ctx) {
1280                LASSERT(imp->imp_dlm_fake == 1);
1281
1282                CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
1283                       imp->imp_obd->obd_type->typ_name,
1284                       imp->imp_obd->obd_name,
1285                       sptlrpc_flavor2name(sf, str, sizeof(str)));
1286
1287                policy = sptlrpc_policy_get(svc_ctx->sc_policy);
1288                sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1289        } else {
1290                LASSERT(imp->imp_dlm_fake == 0);
1291
1292                CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
1293                       imp->imp_obd->obd_type->typ_name,
1294                       imp->imp_obd->obd_name,
1295                       sptlrpc_flavor2name(sf, str, sizeof(str)));
1296
1297                policy = sptlrpc_wireflavor2policy(sf->sf_rpc);
1298                if (!policy) {
1299                        CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
1300                        return NULL;
1301                }
1302        }
1303
1304        sec = policy->sp_cops->create_sec(imp, svc_ctx, sf);
1305        if (sec) {
1306                atomic_inc(&sec->ps_refcount);
1307
1308                sec->ps_part = sp;
1309
1310                if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
1311                        sptlrpc_gc_add_sec(sec);
1312        } else {
1313                sptlrpc_policy_put(policy);
1314        }
1315
1316        return sec;
1317}
1318
1319struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp)
1320{
1321        struct ptlrpc_sec *sec;
1322
1323        spin_lock(&imp->imp_lock);
1324        sec = sptlrpc_sec_get(imp->imp_sec);
1325        spin_unlock(&imp->imp_lock);
1326
1327        return sec;
1328}
1329EXPORT_SYMBOL(sptlrpc_import_sec_ref);
1330
1331static void sptlrpc_import_sec_install(struct obd_import *imp,
1332                                       struct ptlrpc_sec *sec)
1333{
1334        struct ptlrpc_sec *old_sec;
1335
1336        LASSERT_ATOMIC_POS(&sec->ps_refcount);
1337
1338        spin_lock(&imp->imp_lock);
1339        old_sec = imp->imp_sec;
1340        imp->imp_sec = sec;
1341        spin_unlock(&imp->imp_lock);
1342
1343        if (old_sec) {
1344                sptlrpc_sec_kill(old_sec);
1345
1346                /* balance the ref taken by this import */
1347                sptlrpc_sec_put(old_sec);
1348        }
1349}
1350
1351static inline
1352int flavor_equal(struct sptlrpc_flavor *sf1, struct sptlrpc_flavor *sf2)
1353{
1354        return (memcmp(sf1, sf2, sizeof(*sf1)) == 0);
1355}
1356
1357static inline
1358void flavor_copy(struct sptlrpc_flavor *dst, struct sptlrpc_flavor *src)
1359{
1360        *dst = *src;
1361}
1362
1363static void sptlrpc_import_sec_adapt_inplace(struct obd_import *imp,
1364                                             struct ptlrpc_sec *sec,
1365                                             struct sptlrpc_flavor *sf)
1366{
1367        char    str1[32], str2[32];
1368
1369        if (sec->ps_flvr.sf_flags != sf->sf_flags)
1370                CDEBUG(D_SEC, "changing sec flags: %s -> %s\n",
1371                       sptlrpc_secflags2str(sec->ps_flvr.sf_flags,
1372                                            str1, sizeof(str1)),
1373                       sptlrpc_secflags2str(sf->sf_flags,
1374                                            str2, sizeof(str2)));
1375
1376        spin_lock(&sec->ps_lock);
1377        flavor_copy(&sec->ps_flvr, sf);
1378        spin_unlock(&sec->ps_lock);
1379}
1380
1381/**
1382 * To get an appropriate ptlrpc_sec for the \a imp, according to the current
1383 * configuration. Upon called, imp->imp_sec may or may not be NULL.
1384 *
1385 *  - regular import: \a svc_ctx should be NULL and \a flvr is ignored;
1386 *  - reverse import: \a svc_ctx and \a flvr are obtained from incoming request.
1387 */
1388int sptlrpc_import_sec_adapt(struct obd_import *imp,
1389                             struct ptlrpc_svc_ctx *svc_ctx,
1390                             struct sptlrpc_flavor *flvr)
1391{
1392        struct ptlrpc_connection   *conn;
1393        struct sptlrpc_flavor       sf;
1394        struct ptlrpc_sec         *sec, *newsec;
1395        enum lustre_sec_part    sp;
1396        char                    str[24];
1397        int                      rc = 0;
1398
1399        might_sleep();
1400
1401        if (imp == NULL)
1402                return 0;
1403
1404        conn = imp->imp_connection;
1405
1406        if (svc_ctx == NULL) {
1407                struct client_obd *cliobd = &imp->imp_obd->u.cli;
1408                /*
1409                 * normal import, determine flavor from rule set, except
1410                 * for mgc the flavor is predetermined.
1411                 */
1412                if (cliobd->cl_sp_me == LUSTRE_SP_MGC)
1413                        sf = cliobd->cl_flvr_mgc;
1414                else
1415                        sptlrpc_conf_choose_flavor(cliobd->cl_sp_me,
1416                                                   cliobd->cl_sp_to,
1417                                                   &cliobd->cl_target_uuid,
1418                                                   conn->c_self, &sf);
1419
1420                sp = imp->imp_obd->u.cli.cl_sp_me;
1421        } else {
1422                /* reverse import, determine flavor from incoming reqeust */
1423                sf = *flvr;
1424
1425                if (sf.sf_rpc != SPTLRPC_FLVR_NULL)
1426                        sf.sf_flags = PTLRPC_SEC_FL_REVERSE |
1427                                      PTLRPC_SEC_FL_ROOTONLY;
1428
1429                sp = sptlrpc_target_sec_part(imp->imp_obd);
1430        }
1431
1432        sec = sptlrpc_import_sec_ref(imp);
1433        if (sec) {
1434                char    str2[24];
1435
1436                if (flavor_equal(&sf, &sec->ps_flvr))
1437                        GOTO(out, rc);
1438
1439                CDEBUG(D_SEC, "import %s->%s: changing flavor %s -> %s\n",
1440                       imp->imp_obd->obd_name,
1441                       obd_uuid2str(&conn->c_remote_uuid),
1442                       sptlrpc_flavor2name(&sec->ps_flvr, str, sizeof(str)),
1443                       sptlrpc_flavor2name(&sf, str2, sizeof(str2)));
1444
1445                if (SPTLRPC_FLVR_POLICY(sf.sf_rpc) ==
1446                    SPTLRPC_FLVR_POLICY(sec->ps_flvr.sf_rpc) &&
1447                    SPTLRPC_FLVR_MECH(sf.sf_rpc) ==
1448                    SPTLRPC_FLVR_MECH(sec->ps_flvr.sf_rpc)) {
1449                        sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
1450                        GOTO(out, rc);
1451                }
1452        } else if (SPTLRPC_FLVR_BASE(sf.sf_rpc) !=
1453                   SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL)) {
1454                CDEBUG(D_SEC, "import %s->%s netid %x: select flavor %s\n",
1455                       imp->imp_obd->obd_name,
1456                       obd_uuid2str(&conn->c_remote_uuid),
1457                       LNET_NIDNET(conn->c_self),
1458                       sptlrpc_flavor2name(&sf, str, sizeof(str)));
1459        }
1460
1461        mutex_lock(&imp->imp_sec_mutex);
1462
1463        newsec = sptlrpc_sec_create(imp, svc_ctx, &sf, sp);
1464        if (newsec) {
1465                sptlrpc_import_sec_install(imp, newsec);
1466        } else {
1467                CERROR("import %s->%s: failed to create new sec\n",
1468                       imp->imp_obd->obd_name,
1469                       obd_uuid2str(&conn->c_remote_uuid));
1470                rc = -EPERM;
1471        }
1472
1473        mutex_unlock(&imp->imp_sec_mutex);
1474out:
1475        sptlrpc_sec_put(sec);
1476        return rc;
1477}
1478
1479void sptlrpc_import_sec_put(struct obd_import *imp)
1480{
1481        if (imp->imp_sec) {
1482                sptlrpc_sec_kill(imp->imp_sec);
1483
1484                sptlrpc_sec_put(imp->imp_sec);
1485                imp->imp_sec = NULL;
1486        }
1487}
1488
1489static void import_flush_ctx_common(struct obd_import *imp,
1490                                    uid_t uid, int grace, int force)
1491{
1492        struct ptlrpc_sec *sec;
1493
1494        if (imp == NULL)
1495                return;
1496
1497        sec = sptlrpc_import_sec_ref(imp);
1498        if (sec == NULL)
1499                return;
1500
1501        sec_cop_flush_ctx_cache(sec, uid, grace, force);
1502        sptlrpc_sec_put(sec);
1503}
1504
1505void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
1506{
1507        /* it's important to use grace mode, see explain in
1508         * sptlrpc_req_refresh_ctx() */
1509        import_flush_ctx_common(imp, 0, 1, 1);
1510}
1511
1512void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
1513{
1514        import_flush_ctx_common(imp, from_kuid(&init_user_ns, current_uid()),
1515                                1, 1);
1516}
1517EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
1518
1519void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
1520{
1521        import_flush_ctx_common(imp, -1, 1, 1);
1522}
1523EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
1524
1525/**
1526 * Used by ptlrpc client to allocate request buffer of \a req. Upon return
1527 * successfully, req->rq_reqmsg points to a buffer with size \a msgsize.
1528 */
1529int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
1530{
1531        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1532        struct ptlrpc_sec_policy *policy;
1533        int rc;
1534
1535        LASSERT(ctx);
1536        LASSERT(ctx->cc_sec);
1537        LASSERT(ctx->cc_sec->ps_policy);
1538        LASSERT(req->rq_reqmsg == NULL);
1539        LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1540
1541        policy = ctx->cc_sec->ps_policy;
1542        rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
1543        if (!rc) {
1544                LASSERT(req->rq_reqmsg);
1545                LASSERT(req->rq_reqbuf || req->rq_clrbuf);
1546
1547                /* zeroing preallocated buffer */
1548                if (req->rq_pool)
1549                        memset(req->rq_reqmsg, 0, msgsize);
1550        }
1551
1552        return rc;
1553}
1554
1555/**
1556 * Used by ptlrpc client to free request buffer of \a req. After this
1557 * req->rq_reqmsg is set to NULL and should not be accessed anymore.
1558 */
1559void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
1560{
1561        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1562        struct ptlrpc_sec_policy *policy;
1563
1564        LASSERT(ctx);
1565        LASSERT(ctx->cc_sec);
1566        LASSERT(ctx->cc_sec->ps_policy);
1567        LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1568
1569        if (req->rq_reqbuf == NULL && req->rq_clrbuf == NULL)
1570                return;
1571
1572        policy = ctx->cc_sec->ps_policy;
1573        policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1574        req->rq_reqmsg = NULL;
1575}
1576
1577/*
1578 * NOTE caller must guarantee the buffer size is enough for the enlargement
1579 */
1580void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1581                                  int segment, int newsize)
1582{
1583        void   *src, *dst;
1584        int     oldsize, oldmsg_size, movesize;
1585
1586        LASSERT(segment < msg->lm_bufcount);
1587        LASSERT(msg->lm_buflens[segment] <= newsize);
1588
1589        if (msg->lm_buflens[segment] == newsize)
1590                return;
1591
1592        /* nothing to do if we are enlarging the last segment */
1593        if (segment == msg->lm_bufcount - 1) {
1594                msg->lm_buflens[segment] = newsize;
1595                return;
1596        }
1597
1598        oldsize = msg->lm_buflens[segment];
1599
1600        src = lustre_msg_buf(msg, segment + 1, 0);
1601        msg->lm_buflens[segment] = newsize;
1602        dst = lustre_msg_buf(msg, segment + 1, 0);
1603        msg->lm_buflens[segment] = oldsize;
1604
1605        /* move from segment + 1 to end segment */
1606        LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1607        oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1608        movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1609        LASSERT(movesize >= 0);
1610
1611        if (movesize)
1612                memmove(dst, src, movesize);
1613
1614        /* note we don't clear the ares where old data live, not secret */
1615
1616        /* finally set new segment size */
1617        msg->lm_buflens[segment] = newsize;
1618}
1619EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1620
1621/**
1622 * Used by ptlrpc client to enlarge the \a segment of request message pointed
1623 * by req->rq_reqmsg to size \a newsize, all previously filled-in data will be
1624 * preserved after the enlargement. this must be called after original request
1625 * buffer being allocated.
1626 *
1627 * \note after this be called, rq_reqmsg and rq_reqlen might have been changed,
1628 * so caller should refresh its local pointers if needed.
1629 */
1630int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1631                               int segment, int newsize)
1632{
1633        struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1634        struct ptlrpc_sec_cops   *cops;
1635        struct lustre_msg       *msg = req->rq_reqmsg;
1636
1637        LASSERT(ctx);
1638        LASSERT(msg);
1639        LASSERT(msg->lm_bufcount > segment);
1640        LASSERT(msg->lm_buflens[segment] <= newsize);
1641
1642        if (msg->lm_buflens[segment] == newsize)
1643                return 0;
1644
1645        cops = ctx->cc_sec->ps_policy->sp_cops;
1646        LASSERT(cops->enlarge_reqbuf);
1647        return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1648}
1649EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1650
1651/**
1652 * Used by ptlrpc client to allocate reply buffer of \a req.
1653 *
1654 * \note After this, req->rq_repmsg is still not accessible.
1655 */
1656int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1657{
1658        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1659        struct ptlrpc_sec_policy *policy;
1660
1661        LASSERT(ctx);
1662        LASSERT(ctx->cc_sec);
1663        LASSERT(ctx->cc_sec->ps_policy);
1664
1665        if (req->rq_repbuf)
1666                return 0;
1667
1668        policy = ctx->cc_sec->ps_policy;
1669        return policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize);
1670}
1671
1672/**
1673 * Used by ptlrpc client to free reply buffer of \a req. After this
1674 * req->rq_repmsg is set to NULL and should not be accessed anymore.
1675 */
1676void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1677{
1678        struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1679        struct ptlrpc_sec_policy *policy;
1680
1681        LASSERT(ctx);
1682        LASSERT(ctx->cc_sec);
1683        LASSERT(ctx->cc_sec->ps_policy);
1684        LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1685
1686        if (req->rq_repbuf == NULL)
1687                return;
1688        LASSERT(req->rq_repbuf_len);
1689
1690        policy = ctx->cc_sec->ps_policy;
1691        policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1692        req->rq_repmsg = NULL;
1693}
1694
1695int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1696                                struct ptlrpc_cli_ctx *ctx)
1697{
1698        struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1699
1700        if (!policy->sp_cops->install_rctx)
1701                return 0;
1702        return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1703}
1704
1705int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1706                                struct ptlrpc_svc_ctx *ctx)
1707{
1708        struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1709
1710        if (!policy->sp_sops->install_rctx)
1711                return 0;
1712        return policy->sp_sops->install_rctx(imp, ctx);
1713}
1714
1715/****************************************
1716 * server side security          *
1717 ****************************************/
1718
1719static int flavor_allowed(struct sptlrpc_flavor *exp,
1720                          struct ptlrpc_request *req)
1721{
1722        struct sptlrpc_flavor *flvr = &req->rq_flvr;
1723
1724        if (exp->sf_rpc == SPTLRPC_FLVR_ANY || exp->sf_rpc == flvr->sf_rpc)
1725                return 1;
1726
1727        if ((req->rq_ctx_init || req->rq_ctx_fini) &&
1728            SPTLRPC_FLVR_POLICY(exp->sf_rpc) ==
1729            SPTLRPC_FLVR_POLICY(flvr->sf_rpc) &&
1730            SPTLRPC_FLVR_MECH(exp->sf_rpc) == SPTLRPC_FLVR_MECH(flvr->sf_rpc))
1731                return 1;
1732
1733        return 0;
1734}
1735
1736#define EXP_FLVR_UPDATE_EXPIRE      (OBD_TIMEOUT_DEFAULT + 10)
1737
1738/**
1739 * Given an export \a exp, check whether the flavor of incoming \a req
1740 * is allowed by the export \a exp. Main logic is about taking care of
1741 * changing configurations. Return 0 means success.
1742 */
1743int sptlrpc_target_export_check(struct obd_export *exp,
1744                                struct ptlrpc_request *req)
1745{
1746        struct sptlrpc_flavor   flavor;
1747
1748        if (exp == NULL)
1749                return 0;
1750
1751        /* client side export has no imp_reverse, skip
1752         * FIXME maybe we should check flavor this as well??? */
1753        if (exp->exp_imp_reverse == NULL)
1754                return 0;
1755
1756        /* don't care about ctx fini rpc */
1757        if (req->rq_ctx_fini)
1758                return 0;
1759
1760        spin_lock(&exp->exp_lock);
1761
1762        /* if flavor just changed (exp->exp_flvr_changed != 0), we wait for
1763         * the first req with the new flavor, then treat it as current flavor,
1764         * adapt reverse sec according to it.
1765         * note the first rpc with new flavor might not be with root ctx, in
1766         * which case delay the sec_adapt by leaving exp_flvr_adapt == 1. */
1767        if (unlikely(exp->exp_flvr_changed) &&
1768            flavor_allowed(&exp->exp_flvr_old[1], req)) {
1769                /* make the new flavor as "current", and old ones as
1770                 * about-to-expire */
1771                CDEBUG(D_SEC, "exp %p: just changed: %x->%x\n", exp,
1772                       exp->exp_flvr.sf_rpc, exp->exp_flvr_old[1].sf_rpc);
1773                flavor = exp->exp_flvr_old[1];
1774                exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
1775                exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
1776                exp->exp_flvr_old[0] = exp->exp_flvr;
1777                exp->exp_flvr_expire[0] = cfs_time_current_sec() +
1778                                          EXP_FLVR_UPDATE_EXPIRE;
1779                exp->exp_flvr = flavor;
1780
1781                /* flavor change finished */
1782                exp->exp_flvr_changed = 0;
1783                LASSERT(exp->exp_flvr_adapt == 1);
1784
1785                /* if it's gss, we only interested in root ctx init */
1786                if (req->rq_auth_gss &&
1787                    !(req->rq_ctx_init &&
1788                      (req->rq_auth_usr_root || req->rq_auth_usr_mdt ||
1789                       req->rq_auth_usr_ost))) {
1790                        spin_unlock(&exp->exp_lock);
1791                        CDEBUG(D_SEC, "is good but not root(%d:%d:%d:%d:%d)\n",
1792                               req->rq_auth_gss, req->rq_ctx_init,
1793                               req->rq_auth_usr_root, req->rq_auth_usr_mdt,
1794                               req->rq_auth_usr_ost);
1795                        return 0;
1796                }
1797
1798                exp->exp_flvr_adapt = 0;
1799                spin_unlock(&exp->exp_lock);
1800
1801                return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1802                                                req->rq_svc_ctx, &flavor);
1803        }
1804
1805        /* if it equals to the current flavor, we accept it, but need to
1806         * dealing with reverse sec/ctx */
1807        if (likely(flavor_allowed(&exp->exp_flvr, req))) {
1808                /* most cases should return here, we only interested in
1809                 * gss root ctx init */
1810                if (!req->rq_auth_gss || !req->rq_ctx_init ||
1811                    (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1812                     !req->rq_auth_usr_ost)) {
1813                        spin_unlock(&exp->exp_lock);
1814                        return 0;
1815                }
1816
1817                /* if flavor just changed, we should not proceed, just leave
1818                 * it and current flavor will be discovered and replaced
1819                 * shortly, and let _this_ rpc pass through */
1820                if (exp->exp_flvr_changed) {
1821                        LASSERT(exp->exp_flvr_adapt);
1822                        spin_unlock(&exp->exp_lock);
1823                        return 0;
1824                }
1825
1826                if (exp->exp_flvr_adapt) {
1827                        exp->exp_flvr_adapt = 0;
1828                        CDEBUG(D_SEC, "exp %p (%x|%x|%x): do delayed adapt\n",
1829                               exp, exp->exp_flvr.sf_rpc,
1830                               exp->exp_flvr_old[0].sf_rpc,
1831                               exp->exp_flvr_old[1].sf_rpc);
1832                        flavor = exp->exp_flvr;
1833                        spin_unlock(&exp->exp_lock);
1834
1835                        return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1836                                                        req->rq_svc_ctx,
1837                                                        &flavor);
1838                } else {
1839                        CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
1840                               "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
1841                               exp->exp_flvr_old[0].sf_rpc,
1842                               exp->exp_flvr_old[1].sf_rpc);
1843                        spin_unlock(&exp->exp_lock);
1844
1845                        return sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse,
1846                                                           req->rq_svc_ctx);
1847                }
1848        }
1849
1850        if (exp->exp_flvr_expire[0]) {
1851                if (exp->exp_flvr_expire[0] >= cfs_time_current_sec()) {
1852                        if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
1853                                CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1854                                       "middle one ("CFS_DURATION_T")\n", exp,
1855                                       exp->exp_flvr.sf_rpc,
1856                                       exp->exp_flvr_old[0].sf_rpc,
1857                                       exp->exp_flvr_old[1].sf_rpc,
1858                                       exp->exp_flvr_expire[0] -
1859                                                cfs_time_current_sec());
1860                                spin_unlock(&exp->exp_lock);
1861                                return 0;
1862                        }
1863                } else {
1864                        CDEBUG(D_SEC, "mark middle expired\n");
1865                        exp->exp_flvr_expire[0] = 0;
1866                }
1867                CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match middle\n", exp,
1868                       exp->exp_flvr.sf_rpc,
1869                       exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1870                       req->rq_flvr.sf_rpc);
1871        }
1872
1873        /* now it doesn't match the current flavor, the only chance we can
1874         * accept it is match the old flavors which is not expired. */
1875        if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
1876                if (exp->exp_flvr_expire[1] >= cfs_time_current_sec()) {
1877                        if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
1878                                CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1879                                       "oldest one ("CFS_DURATION_T")\n", exp,
1880                                       exp->exp_flvr.sf_rpc,
1881                                       exp->exp_flvr_old[0].sf_rpc,
1882                                       exp->exp_flvr_old[1].sf_rpc,
1883                                       exp->exp_flvr_expire[1] -
1884                                                cfs_time_current_sec());
1885                                spin_unlock(&exp->exp_lock);
1886                                return 0;
1887                        }
1888                } else {
1889                        CDEBUG(D_SEC, "mark oldest expired\n");
1890                        exp->exp_flvr_expire[1] = 0;
1891                }
1892                CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match found\n",
1893                       exp, exp->exp_flvr.sf_rpc,
1894                       exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1895                       req->rq_flvr.sf_rpc);
1896        } else {
1897                CDEBUG(D_SEC, "exp %p (%x|%x|%x): skip the last one\n",
1898                       exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc,
1899                       exp->exp_flvr_old[1].sf_rpc);
1900        }
1901
1902        spin_unlock(&exp->exp_lock);
1903
1904        CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with "
1905              "unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n",
1906              exp, exp->exp_obd->obd_name,
1907              req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
1908              req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost,
1909              req->rq_flvr.sf_rpc,
1910              exp->exp_flvr.sf_rpc,
1911              exp->exp_flvr_old[0].sf_rpc,
1912              exp->exp_flvr_expire[0] ?
1913              (unsigned long) (exp->exp_flvr_expire[0] -
1914                               cfs_time_current_sec()) : 0,
1915              exp->exp_flvr_old[1].sf_rpc,
1916              exp->exp_flvr_expire[1] ?
1917              (unsigned long) (exp->exp_flvr_expire[1] -
1918                               cfs_time_current_sec()) : 0);
1919        return -EACCES;
1920}
1921EXPORT_SYMBOL(sptlrpc_target_export_check);
1922
1923void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
1924                                      struct sptlrpc_rule_set *rset)
1925{
1926        struct obd_export       *exp;
1927        struct sptlrpc_flavor    new_flvr;
1928
1929        LASSERT(obd);
1930
1931        spin_lock(&obd->obd_dev_lock);
1932
1933        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1934                if (exp->exp_connection == NULL)
1935                        continue;
1936
1937                /* note if this export had just been updated flavor
1938                 * (exp_flvr_changed == 1), this will override the
1939                 * previous one. */
1940                spin_lock(&exp->exp_lock);
1941                sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer,
1942                                             exp->exp_connection->c_peer.nid,
1943                                             &new_flvr);
1944                if (exp->exp_flvr_changed ||
1945                    !flavor_equal(&new_flvr, &exp->exp_flvr)) {
1946                        exp->exp_flvr_old[1] = new_flvr;
1947                        exp->exp_flvr_expire[1] = 0;
1948                        exp->exp_flvr_changed = 1;
1949                        exp->exp_flvr_adapt = 1;
1950
1951                        CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
1952                               exp, sptlrpc_part2name(exp->exp_sp_peer),
1953                               exp->exp_flvr.sf_rpc,
1954                               exp->exp_flvr_old[1].sf_rpc);
1955                }
1956                spin_unlock(&exp->exp_lock);
1957        }
1958
1959        spin_unlock(&obd->obd_dev_lock);
1960}
1961EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
1962
1963static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
1964{
1965        /* peer's claim is unreliable unless gss is being used */
1966        if (!req->rq_auth_gss || svc_rc == SECSVC_DROP)
1967                return svc_rc;
1968
1969        switch (req->rq_sp_from) {
1970        case LUSTRE_SP_CLI:
1971                if (req->rq_auth_usr_mdt || req->rq_auth_usr_ost) {
1972                        DEBUG_REQ(D_ERROR, req, "faked source CLI");
1973                        svc_rc = SECSVC_DROP;
1974                }
1975                break;
1976        case LUSTRE_SP_MDT:
1977                if (!req->rq_auth_usr_mdt) {
1978                        DEBUG_REQ(D_ERROR, req, "faked source MDT");
1979                        svc_rc = SECSVC_DROP;
1980                }
1981                break;
1982        case LUSTRE_SP_OST:
1983                if (!req->rq_auth_usr_ost) {
1984                        DEBUG_REQ(D_ERROR, req, "faked source OST");
1985                        svc_rc = SECSVC_DROP;
1986                }
1987                break;
1988        case LUSTRE_SP_MGS:
1989        case LUSTRE_SP_MGC:
1990                if (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1991                    !req->rq_auth_usr_ost) {
1992                        DEBUG_REQ(D_ERROR, req, "faked source MGC/MGS");
1993                        svc_rc = SECSVC_DROP;
1994                }
1995                break;
1996        case LUSTRE_SP_ANY:
1997        default:
1998                DEBUG_REQ(D_ERROR, req, "invalid source %u", req->rq_sp_from);
1999                svc_rc = SECSVC_DROP;
2000        }
2001
2002        return svc_rc;
2003}
2004
2005/**
2006 * Used by ptlrpc server, to perform transformation upon request message of
2007 * incoming \a req. This must be the first thing to do with a incoming
2008 * request in ptlrpc layer.
2009 *
2010 * \retval SECSVC_OK success, and req->rq_reqmsg point to request message in
2011 * clear text, size is req->rq_reqlen; also req->rq_svc_ctx is set.
2012 * \retval SECSVC_COMPLETE success, the request has been fully processed, and
2013 * reply message has been prepared.
2014 * \retval SECSVC_DROP failed, this request should be dropped.
2015 */
2016int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
2017{
2018        struct ptlrpc_sec_policy *policy;
2019        struct lustre_msg       *msg = req->rq_reqbuf;
2020        int                    rc;
2021
2022        LASSERT(msg);
2023        LASSERT(req->rq_reqmsg == NULL);
2024        LASSERT(req->rq_repmsg == NULL);
2025        LASSERT(req->rq_svc_ctx == NULL);
2026
2027        req->rq_req_swab_mask = 0;
2028
2029        rc = __lustre_unpack_msg(msg, req->rq_reqdata_len);
2030        switch (rc) {
2031        case 1:
2032                lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
2033        case 0:
2034                break;
2035        default:
2036                CERROR("error unpacking request from %s x"LPU64"\n",
2037                       libcfs_id2str(req->rq_peer), req->rq_xid);
2038                return SECSVC_DROP;
2039        }
2040
2041        req->rq_flvr.sf_rpc = WIRE_FLVR(msg->lm_secflvr);
2042        req->rq_sp_from = LUSTRE_SP_ANY;
2043        req->rq_auth_uid = -1;
2044        req->rq_auth_mapped_uid = -1;
2045
2046        policy = sptlrpc_wireflavor2policy(req->rq_flvr.sf_rpc);
2047        if (!policy) {
2048                CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
2049                return SECSVC_DROP;
2050        }
2051
2052        LASSERT(policy->sp_sops->accept);
2053        rc = policy->sp_sops->accept(req);
2054        sptlrpc_policy_put(policy);
2055        LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
2056        LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
2057
2058        /*
2059         * if it's not null flavor (which means embedded packing msg),
2060         * reset the swab mask for the comming inner msg unpacking.
2061         */
2062        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL)
2063                req->rq_req_swab_mask = 0;
2064
2065        /* sanity check for the request source */
2066        rc = sptlrpc_svc_check_from(req, rc);
2067        return rc;
2068}
2069
2070/**
2071 * Used by ptlrpc server, to allocate reply buffer for \a req. If succeed,
2072 * req->rq_reply_state is set, and req->rq_reply_state->rs_msg point to
2073 * a buffer of \a msglen size.
2074 */
2075int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
2076{
2077        struct ptlrpc_sec_policy *policy;
2078        struct ptlrpc_reply_state *rs;
2079        int rc;
2080
2081        LASSERT(req->rq_svc_ctx);
2082        LASSERT(req->rq_svc_ctx->sc_policy);
2083
2084        policy = req->rq_svc_ctx->sc_policy;
2085        LASSERT(policy->sp_sops->alloc_rs);
2086
2087        rc = policy->sp_sops->alloc_rs(req, msglen);
2088        if (unlikely(rc == -ENOMEM)) {
2089                /* failed alloc, try emergency pool */
2090                rs = lustre_get_emerg_rs(req->rq_rqbd->rqbd_svcpt);
2091                if (rs == NULL)
2092                        return -ENOMEM;
2093
2094                req->rq_reply_state = rs;
2095                rc = policy->sp_sops->alloc_rs(req, msglen);
2096                if (rc) {
2097                        lustre_put_emerg_rs(rs);
2098                        req->rq_reply_state = NULL;
2099                }
2100        }
2101
2102        LASSERT(rc != 0 ||
2103                (req->rq_reply_state && req->rq_reply_state->rs_msg));
2104
2105        return rc;
2106}
2107
2108/**
2109 * Used by ptlrpc server, to perform transformation upon reply message.
2110 *
2111 * \post req->rq_reply_off is set to approriate server-controlled reply offset.
2112 * \post req->rq_repmsg and req->rq_reply_state->rs_msg becomes inaccessible.
2113 */
2114int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
2115{
2116        struct ptlrpc_sec_policy *policy;
2117        int rc;
2118
2119        LASSERT(req->rq_svc_ctx);
2120        LASSERT(req->rq_svc_ctx->sc_policy);
2121
2122        policy = req->rq_svc_ctx->sc_policy;
2123        LASSERT(policy->sp_sops->authorize);
2124
2125        rc = policy->sp_sops->authorize(req);
2126        LASSERT(rc || req->rq_reply_state->rs_repdata_len);
2127
2128        return rc;
2129}
2130
2131/**
2132 * Used by ptlrpc server, to free reply_state.
2133 */
2134void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
2135{
2136        struct ptlrpc_sec_policy *policy;
2137        unsigned int prealloc;
2138
2139        LASSERT(rs->rs_svc_ctx);
2140        LASSERT(rs->rs_svc_ctx->sc_policy);
2141
2142        policy = rs->rs_svc_ctx->sc_policy;
2143        LASSERT(policy->sp_sops->free_rs);
2144
2145        prealloc = rs->rs_prealloc;
2146        policy->sp_sops->free_rs(rs);
2147
2148        if (prealloc)
2149                lustre_put_emerg_rs(rs);
2150}
2151
2152void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
2153{
2154        struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2155
2156        if (ctx != NULL)
2157                atomic_inc(&ctx->sc_refcount);
2158}
2159
2160void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
2161{
2162        struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2163
2164        if (ctx == NULL)
2165                return;
2166
2167        LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2168        if (atomic_dec_and_test(&ctx->sc_refcount)) {
2169                if (ctx->sc_policy->sp_sops->free_ctx)
2170                        ctx->sc_policy->sp_sops->free_ctx(ctx);
2171        }
2172        req->rq_svc_ctx = NULL;
2173}
2174
2175void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
2176{
2177        struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2178
2179        if (ctx == NULL)
2180                return;
2181
2182        LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2183        if (ctx->sc_policy->sp_sops->invalidate_ctx)
2184                ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
2185}
2186EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
2187
2188/****************************************
2189 * bulk security                        *
2190 ****************************************/
2191
2192/**
2193 * Perform transformation upon bulk data pointed by \a desc. This is called
2194 * before transforming the request message.
2195 */
2196int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
2197                          struct ptlrpc_bulk_desc *desc)
2198{
2199        struct ptlrpc_cli_ctx *ctx;
2200
2201        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2202
2203        if (!req->rq_pack_bulk)
2204                return 0;
2205
2206        ctx = req->rq_cli_ctx;
2207        if (ctx->cc_ops->wrap_bulk)
2208                return ctx->cc_ops->wrap_bulk(ctx, req, desc);
2209        return 0;
2210}
2211EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
2212
2213/**
2214 * This is called after unwrap the reply message.
2215 * return nob of actual plain text size received, or error code.
2216 */
2217int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
2218                                 struct ptlrpc_bulk_desc *desc,
2219                                 int nob)
2220{
2221        struct ptlrpc_cli_ctx  *ctx;
2222        int                  rc;
2223
2224        LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
2225
2226        if (!req->rq_pack_bulk)
2227                return desc->bd_nob_transferred;
2228
2229        ctx = req->rq_cli_ctx;
2230        if (ctx->cc_ops->unwrap_bulk) {
2231                rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2232                if (rc < 0)
2233                        return rc;
2234        }
2235        return desc->bd_nob_transferred;
2236}
2237EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
2238
2239/**
2240 * This is called after unwrap the reply message.
2241 * return 0 for success or error code.
2242 */
2243int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
2244                                  struct ptlrpc_bulk_desc *desc)
2245{
2246        struct ptlrpc_cli_ctx  *ctx;
2247        int                  rc;
2248
2249        LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
2250
2251        if (!req->rq_pack_bulk)
2252                return 0;
2253
2254        ctx = req->rq_cli_ctx;
2255        if (ctx->cc_ops->unwrap_bulk) {
2256                rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2257                if (rc < 0)
2258                        return rc;
2259        }
2260
2261        /*
2262         * if everything is going right, nob should equals to nob_transferred.
2263         * in case of privacy mode, nob_transferred needs to be adjusted.
2264         */
2265        if (desc->bd_nob != desc->bd_nob_transferred) {
2266                CERROR("nob %d doesn't match transferred nob %d",
2267                       desc->bd_nob, desc->bd_nob_transferred);
2268                return -EPROTO;
2269        }
2270
2271        return 0;
2272}
2273EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
2274
2275
2276/****************************************
2277 * user descriptor helpers            *
2278 ****************************************/
2279
2280int sptlrpc_current_user_desc_size(void)
2281{
2282        int ngroups;
2283
2284        ngroups = current_ngroups;
2285
2286        if (ngroups > LUSTRE_MAX_GROUPS)
2287                ngroups = LUSTRE_MAX_GROUPS;
2288        return sptlrpc_user_desc_size(ngroups);
2289}
2290EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
2291
2292int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
2293{
2294        struct ptlrpc_user_desc *pud;
2295
2296        pud = lustre_msg_buf(msg, offset, 0);
2297
2298        pud->pud_uid = from_kuid(&init_user_ns, current_uid());
2299        pud->pud_gid = from_kgid(&init_user_ns, current_gid());
2300        pud->pud_fsuid = from_kuid(&init_user_ns, current_fsuid());
2301        pud->pud_fsgid = from_kgid(&init_user_ns, current_fsgid());
2302        pud->pud_cap = cfs_curproc_cap_pack();
2303        pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
2304
2305        task_lock(current);
2306        if (pud->pud_ngroups > current_ngroups)
2307                pud->pud_ngroups = current_ngroups;
2308        memcpy(pud->pud_groups, current_cred()->group_info->blocks[0],
2309               pud->pud_ngroups * sizeof(__u32));
2310        task_unlock(current);
2311
2312        return 0;
2313}
2314EXPORT_SYMBOL(sptlrpc_pack_user_desc);
2315
2316int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset, int swabbed)
2317{
2318        struct ptlrpc_user_desc *pud;
2319        int                   i;
2320
2321        pud = lustre_msg_buf(msg, offset, sizeof(*pud));
2322        if (!pud)
2323                return -EINVAL;
2324
2325        if (swabbed) {
2326                __swab32s(&pud->pud_uid);
2327                __swab32s(&pud->pud_gid);
2328                __swab32s(&pud->pud_fsuid);
2329                __swab32s(&pud->pud_fsgid);
2330                __swab32s(&pud->pud_cap);
2331                __swab32s(&pud->pud_ngroups);
2332        }
2333
2334        if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
2335                CERROR("%u groups is too large\n", pud->pud_ngroups);
2336                return -EINVAL;
2337        }
2338
2339        if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
2340            msg->lm_buflens[offset]) {
2341                CERROR("%u groups are claimed but bufsize only %u\n",
2342                       pud->pud_ngroups, msg->lm_buflens[offset]);
2343                return -EINVAL;
2344        }
2345
2346        if (swabbed) {
2347                for (i = 0; i < pud->pud_ngroups; i++)
2348                        __swab32s(&pud->pud_groups[i]);
2349        }
2350
2351        return 0;
2352}
2353EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
2354
2355/****************************************
2356 * misc helpers                  *
2357 ****************************************/
2358
2359const char * sec2target_str(struct ptlrpc_sec *sec)
2360{
2361        if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
2362                return "*";
2363        if (sec_is_reverse(sec))
2364                return "c";
2365        return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
2366}
2367EXPORT_SYMBOL(sec2target_str);
2368
2369/*
2370 * return true if the bulk data is protected
2371 */
2372int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
2373{
2374        switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
2375        case SPTLRPC_BULK_SVC_INTG:
2376        case SPTLRPC_BULK_SVC_PRIV:
2377                return 1;
2378        default:
2379                return 0;
2380        }
2381}
2382EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
2383
2384/****************************************
2385 * crypto API helper/alloc blkciper     *
2386 ****************************************/
2387
2388/****************************************
2389 * initialize/finalize            *
2390 ****************************************/
2391
2392int sptlrpc_init(void)
2393{
2394        int rc;
2395
2396        rwlock_init(&policy_lock);
2397
2398        rc = sptlrpc_gc_init();
2399        if (rc)
2400                goto out;
2401
2402        rc = sptlrpc_conf_init();
2403        if (rc)
2404                goto out_gc;
2405
2406        rc = sptlrpc_enc_pool_init();
2407        if (rc)
2408                goto out_conf;
2409
2410        rc = sptlrpc_null_init();
2411        if (rc)
2412                goto out_pool;
2413
2414        rc = sptlrpc_plain_init();
2415        if (rc)
2416                goto out_null;
2417
2418        rc = sptlrpc_lproc_init();
2419        if (rc)
2420                goto out_plain;
2421
2422        return 0;
2423
2424out_plain:
2425        sptlrpc_plain_fini();
2426out_null:
2427        sptlrpc_null_fini();
2428out_pool:
2429        sptlrpc_enc_pool_fini();
2430out_conf:
2431        sptlrpc_conf_fini();
2432out_gc:
2433        sptlrpc_gc_fini();
2434out:
2435        return rc;
2436}
2437
2438void sptlrpc_fini(void)
2439{
2440        sptlrpc_lproc_fini();
2441        sptlrpc_plain_fini();
2442        sptlrpc_null_fini();
2443        sptlrpc_enc_pool_fini();
2444        sptlrpc_conf_fini();
2445        sptlrpc_gc_fini();
2446}
2447