linux/fs/nfs/objlayout/objio_osd.c
<<
>>
Prefs
   1/*
   2 *  pNFS Objects layout implementation over open-osd initiator library
   3 *
   4 *  Copyright (C) 2009 Panasas Inc. [year of first publication]
   5 *  All rights reserved.
   6 *
   7 *  Benny Halevy <bhalevy@panasas.com>
   8 *  Boaz Harrosh <bharrosh@panasas.com>
   9 *
  10 *  This program is free software; you can redistribute it and/or modify
  11 *  it under the terms of the GNU General Public License version 2
  12 *  See the file COPYING included with this distribution for more details.
  13 *
  14 *  Redistribution and use in source and binary forms, with or without
  15 *  modification, are permitted provided that the following conditions
  16 *  are met:
  17 *
  18 *  1. Redistributions of source code must retain the above copyright
  19 *     notice, this list of conditions and the following disclaimer.
  20 *  2. Redistributions in binary form must reproduce the above copyright
  21 *     notice, this list of conditions and the following disclaimer in the
  22 *     documentation and/or other materials provided with the distribution.
  23 *  3. Neither the name of the Panasas company nor the names of its
  24 *     contributors may be used to endorse or promote products derived
  25 *     from this software without specific prior written permission.
  26 *
  27 *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
  28 *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  29 *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  30 *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  31 *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  32 *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  33 *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
  34 *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  35 *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  36 *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  37 *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40#include <linux/module.h>
  41#include <scsi/osd_initiator.h>
  42
  43#include "objlayout.h"
  44
  45#define NFSDBG_FACILITY         NFSDBG_PNFS_LD
  46
  47#define _LLU(x) ((unsigned long long)x)
  48
  49enum { BIO_MAX_PAGES_KMALLOC =
  50                (PAGE_SIZE - sizeof(struct bio)) / sizeof(struct bio_vec),
  51};
  52
  53struct objio_dev_ent {
  54        struct nfs4_deviceid_node id_node;
  55        struct osd_dev *od;
  56};
  57
  58static void
  59objio_free_deviceid_node(struct nfs4_deviceid_node *d)
  60{
  61        struct objio_dev_ent *de = container_of(d, struct objio_dev_ent, id_node);
  62
  63        dprintk("%s: free od=%p\n", __func__, de->od);
  64        osduld_put_device(de->od);
  65        kfree(de);
  66}
  67
  68static struct objio_dev_ent *_dev_list_find(const struct nfs_server *nfss,
  69        const struct nfs4_deviceid *d_id)
  70{
  71        struct nfs4_deviceid_node *d;
  72        struct objio_dev_ent *de;
  73
  74        d = nfs4_find_get_deviceid(nfss->pnfs_curr_ld, nfss->nfs_client, d_id);
  75        if (!d)
  76                return NULL;
  77
  78        de = container_of(d, struct objio_dev_ent, id_node);
  79        return de;
  80}
  81
  82static struct objio_dev_ent *
  83_dev_list_add(const struct nfs_server *nfss,
  84        const struct nfs4_deviceid *d_id, struct osd_dev *od,
  85        gfp_t gfp_flags)
  86{
  87        struct nfs4_deviceid_node *d;
  88        struct objio_dev_ent *de = kzalloc(sizeof(*de), gfp_flags);
  89        struct objio_dev_ent *n;
  90
  91        if (!de) {
  92                dprintk("%s: -ENOMEM od=%p\n", __func__, od);
  93                return NULL;
  94        }
  95
  96        dprintk("%s: Adding od=%p\n", __func__, od);
  97        nfs4_init_deviceid_node(&de->id_node,
  98                                nfss->pnfs_curr_ld,
  99                                nfss->nfs_client,
 100                                d_id);
 101        de->od = od;
 102
 103        d = nfs4_insert_deviceid_node(&de->id_node);
 104        n = container_of(d, struct objio_dev_ent, id_node);
 105        if (n != de) {
 106                dprintk("%s: Race with other n->od=%p\n", __func__, n->od);
 107                objio_free_deviceid_node(&de->id_node);
 108                de = n;
 109        }
 110
 111        return de;
 112}
 113
 114struct caps_buffers {
 115        u8 caps_key[OSD_CRYPTO_KEYID_SIZE];
 116        u8 creds[OSD_CAP_LEN];
 117};
 118
 119struct objio_segment {
 120        struct pnfs_layout_segment lseg;
 121
 122        struct pnfs_osd_object_cred *comps;
 123
 124        unsigned mirrors_p1;
 125        unsigned stripe_unit;
 126        unsigned group_width;   /* Data stripe_units without integrity comps */
 127        u64 group_depth;
 128        unsigned group_count;
 129
 130        unsigned max_io_size;
 131
 132        unsigned comps_index;
 133        unsigned num_comps;
 134        /* variable length */
 135        struct objio_dev_ent *ods[];
 136};
 137
 138static inline struct objio_segment *
 139OBJIO_LSEG(struct pnfs_layout_segment *lseg)
 140{
 141        return container_of(lseg, struct objio_segment, lseg);
 142}
 143
 144struct objio_state;
 145typedef ssize_t (*objio_done_fn)(struct objio_state *ios);
 146
 147struct objio_state {
 148        /* Generic layer */
 149        struct objlayout_io_state ol_state;
 150
 151        struct objio_segment *layout;
 152
 153        struct kref kref;
 154        objio_done_fn done;
 155        void *private;
 156
 157        unsigned long length;
 158        unsigned numdevs; /* Actually used devs in this IO */
 159        /* A per-device variable array of size numdevs */
 160        struct _objio_per_comp {
 161                struct bio *bio;
 162                struct osd_request *or;
 163                unsigned long length;
 164                u64 offset;
 165                unsigned dev;
 166        } per_dev[];
 167};
 168
 169/* Send and wait for a get_device_info of devices in the layout,
 170   then look them up with the osd_initiator library */
 171static struct objio_dev_ent *_device_lookup(struct pnfs_layout_hdr *pnfslay,
 172                                struct objio_segment *objio_seg, unsigned comp,
 173                                gfp_t gfp_flags)
 174{
 175        struct pnfs_osd_deviceaddr *deviceaddr;
 176        struct nfs4_deviceid *d_id;
 177        struct objio_dev_ent *ode;
 178        struct osd_dev *od;
 179        struct osd_dev_info odi;
 180        int err;
 181
 182        d_id = &objio_seg->comps[comp].oc_object_id.oid_device_id;
 183
 184        ode = _dev_list_find(NFS_SERVER(pnfslay->plh_inode), d_id);
 185        if (ode)
 186                return ode;
 187
 188        err = objlayout_get_deviceinfo(pnfslay, d_id, &deviceaddr, gfp_flags);
 189        if (unlikely(err)) {
 190                dprintk("%s: objlayout_get_deviceinfo dev(%llx:%llx) =>%d\n",
 191                        __func__, _DEVID_LO(d_id), _DEVID_HI(d_id), err);
 192                return ERR_PTR(err);
 193        }
 194
 195        odi.systemid_len = deviceaddr->oda_systemid.len;
 196        if (odi.systemid_len > sizeof(odi.systemid)) {
 197                err = -EINVAL;
 198                goto out;
 199        } else if (odi.systemid_len)
 200                memcpy(odi.systemid, deviceaddr->oda_systemid.data,
 201                       odi.systemid_len);
 202        odi.osdname_len  = deviceaddr->oda_osdname.len;
 203        odi.osdname      = (u8 *)deviceaddr->oda_osdname.data;
 204
 205        if (!odi.osdname_len && !odi.systemid_len) {
 206                dprintk("%s: !odi.osdname_len && !odi.systemid_len\n",
 207                        __func__);
 208                err = -ENODEV;
 209                goto out;
 210        }
 211
 212        od = osduld_info_lookup(&odi);
 213        if (unlikely(IS_ERR(od))) {
 214                err = PTR_ERR(od);
 215                dprintk("%s: osduld_info_lookup => %d\n", __func__, err);
 216                goto out;
 217        }
 218
 219        ode = _dev_list_add(NFS_SERVER(pnfslay->plh_inode), d_id, od,
 220                            gfp_flags);
 221
 222out:
 223        dprintk("%s: return=%d\n", __func__, err);
 224        objlayout_put_deviceinfo(deviceaddr);
 225        return err ? ERR_PTR(err) : ode;
 226}
 227
 228static int objio_devices_lookup(struct pnfs_layout_hdr *pnfslay,
 229        struct objio_segment *objio_seg,
 230        gfp_t gfp_flags)
 231{
 232        unsigned i;
 233        int err;
 234
 235        /* lookup all devices */
 236        for (i = 0; i < objio_seg->num_comps; i++) {
 237                struct objio_dev_ent *ode;
 238
 239                ode = _device_lookup(pnfslay, objio_seg, i, gfp_flags);
 240                if (unlikely(IS_ERR(ode))) {
 241                        err = PTR_ERR(ode);
 242                        goto out;
 243                }
 244                objio_seg->ods[i] = ode;
 245        }
 246        err = 0;
 247
 248out:
 249        dprintk("%s: return=%d\n", __func__, err);
 250        return err;
 251}
 252
 253static int _verify_data_map(struct pnfs_osd_layout *layout)
 254{
 255        struct pnfs_osd_data_map *data_map = &layout->olo_map;
 256        u64 stripe_length;
 257        u32 group_width;
 258
 259/* FIXME: Only raid0 for now. if not go through MDS */
 260        if (data_map->odm_raid_algorithm != PNFS_OSD_RAID_0) {
 261                printk(KERN_ERR "Only RAID_0 for now\n");
 262                return -ENOTSUPP;
 263        }
 264        if (0 != (data_map->odm_num_comps % (data_map->odm_mirror_cnt + 1))) {
 265                printk(KERN_ERR "Data Map wrong, num_comps=%u mirrors=%u\n",
 266                          data_map->odm_num_comps, data_map->odm_mirror_cnt);
 267                return -EINVAL;
 268        }
 269
 270        if (data_map->odm_group_width)
 271                group_width = data_map->odm_group_width;
 272        else
 273                group_width = data_map->odm_num_comps /
 274                                                (data_map->odm_mirror_cnt + 1);
 275
 276        stripe_length = (u64)data_map->odm_stripe_unit * group_width;
 277        if (stripe_length >= (1ULL << 32)) {
 278                printk(KERN_ERR "Total Stripe length(0x%llx)"
 279                          " >= 32bit is not supported\n", _LLU(stripe_length));
 280                return -ENOTSUPP;
 281        }
 282
 283        if (0 != (data_map->odm_stripe_unit & ~PAGE_MASK)) {
 284                printk(KERN_ERR "Stripe Unit(0x%llx)"
 285                          " must be Multples of PAGE_SIZE(0x%lx)\n",
 286                          _LLU(data_map->odm_stripe_unit), PAGE_SIZE);
 287                return -ENOTSUPP;
 288        }
 289
 290        return 0;
 291}
 292
 293static void copy_single_comp(struct pnfs_osd_object_cred *cur_comp,
 294                             struct pnfs_osd_object_cred *src_comp,
 295                             struct caps_buffers *caps_p)
 296{
 297        WARN_ON(src_comp->oc_cap_key.cred_len > sizeof(caps_p->caps_key));
 298        WARN_ON(src_comp->oc_cap.cred_len > sizeof(caps_p->creds));
 299
 300        *cur_comp = *src_comp;
 301
 302        memcpy(caps_p->caps_key, src_comp->oc_cap_key.cred,
 303               sizeof(caps_p->caps_key));
 304        cur_comp->oc_cap_key.cred = caps_p->caps_key;
 305
 306        memcpy(caps_p->creds, src_comp->oc_cap.cred,
 307               sizeof(caps_p->creds));
 308        cur_comp->oc_cap.cred = caps_p->creds;
 309}
 310
 311int objio_alloc_lseg(struct pnfs_layout_segment **outp,
 312        struct pnfs_layout_hdr *pnfslay,
 313        struct pnfs_layout_range *range,
 314        struct xdr_stream *xdr,
 315        gfp_t gfp_flags)
 316{
 317        struct objio_segment *objio_seg;
 318        struct pnfs_osd_xdr_decode_layout_iter iter;
 319        struct pnfs_osd_layout layout;
 320        struct pnfs_osd_object_cred *cur_comp, src_comp;
 321        struct caps_buffers *caps_p;
 322        int err;
 323
 324        err = pnfs_osd_xdr_decode_layout_map(&layout, &iter, xdr);
 325        if (unlikely(err))
 326                return err;
 327
 328        err = _verify_data_map(&layout);
 329        if (unlikely(err))
 330                return err;
 331
 332        objio_seg = kzalloc(sizeof(*objio_seg) +
 333                            sizeof(objio_seg->ods[0]) * layout.olo_num_comps +
 334                            sizeof(*objio_seg->comps) * layout.olo_num_comps +
 335                            sizeof(struct caps_buffers) * layout.olo_num_comps,
 336                            gfp_flags);
 337        if (!objio_seg)
 338                return -ENOMEM;
 339
 340        objio_seg->comps = (void *)(objio_seg->ods + layout.olo_num_comps);
 341        cur_comp = objio_seg->comps;
 342        caps_p = (void *)(cur_comp + layout.olo_num_comps);
 343        while (pnfs_osd_xdr_decode_layout_comp(&src_comp, &iter, xdr, &err))
 344                copy_single_comp(cur_comp++, &src_comp, caps_p++);
 345        if (unlikely(err))
 346                goto err;
 347
 348        objio_seg->num_comps = layout.olo_num_comps;
 349        objio_seg->comps_index = layout.olo_comps_index;
 350        err = objio_devices_lookup(pnfslay, objio_seg, gfp_flags);
 351        if (err)
 352                goto err;
 353
 354        objio_seg->mirrors_p1 = layout.olo_map.odm_mirror_cnt + 1;
 355        objio_seg->stripe_unit = layout.olo_map.odm_stripe_unit;
 356        if (layout.olo_map.odm_group_width) {
 357                objio_seg->group_width = layout.olo_map.odm_group_width;
 358                objio_seg->group_depth = layout.olo_map.odm_group_depth;
 359                objio_seg->group_count = layout.olo_map.odm_num_comps /
 360                                                objio_seg->mirrors_p1 /
 361                                                objio_seg->group_width;
 362        } else {
 363                objio_seg->group_width = layout.olo_map.odm_num_comps /
 364                                                objio_seg->mirrors_p1;
 365                objio_seg->group_depth = -1;
 366                objio_seg->group_count = 1;
 367        }
 368
 369        /* Cache this calculation it will hit for every page */
 370        objio_seg->max_io_size = (BIO_MAX_PAGES_KMALLOC * PAGE_SIZE -
 371                                  objio_seg->stripe_unit) *
 372                                 objio_seg->group_width;
 373
 374        *outp = &objio_seg->lseg;
 375        return 0;
 376
 377err:
 378        kfree(objio_seg);
 379        dprintk("%s: Error: return %d\n", __func__, err);
 380        *outp = NULL;
 381        return err;
 382}
 383
 384void objio_free_lseg(struct pnfs_layout_segment *lseg)
 385{
 386        int i;
 387        struct objio_segment *objio_seg = OBJIO_LSEG(lseg);
 388
 389        for (i = 0; i < objio_seg->num_comps; i++) {
 390                if (!objio_seg->ods[i])
 391                        break;
 392                nfs4_put_deviceid_node(&objio_seg->ods[i]->id_node);
 393        }
 394        kfree(objio_seg);
 395}
 396
 397int objio_alloc_io_state(struct pnfs_layout_segment *lseg,
 398                         struct objlayout_io_state **outp,
 399                         gfp_t gfp_flags)
 400{
 401        struct objio_segment *objio_seg = OBJIO_LSEG(lseg);
 402        struct objio_state *ios;
 403        const unsigned first_size = sizeof(*ios) +
 404                                objio_seg->num_comps * sizeof(ios->per_dev[0]);
 405        const unsigned sec_size = objio_seg->num_comps *
 406                                                sizeof(ios->ol_state.ioerrs[0]);
 407
 408        ios = kzalloc(first_size + sec_size, gfp_flags);
 409        if (unlikely(!ios))
 410                return -ENOMEM;
 411
 412        ios->layout = objio_seg;
 413        ios->ol_state.ioerrs = ((void *)ios) + first_size;
 414        ios->ol_state.num_comps = objio_seg->num_comps;
 415
 416        *outp = &ios->ol_state;
 417        return 0;
 418}
 419
 420void objio_free_io_state(struct objlayout_io_state *ol_state)
 421{
 422        struct objio_state *ios = container_of(ol_state, struct objio_state,
 423                                               ol_state);
 424
 425        kfree(ios);
 426}
 427
 428enum pnfs_osd_errno osd_pri_2_pnfs_err(enum osd_err_priority oep)
 429{
 430        switch (oep) {
 431        case OSD_ERR_PRI_NO_ERROR:
 432                return (enum pnfs_osd_errno)0;
 433
 434        case OSD_ERR_PRI_CLEAR_PAGES:
 435                BUG_ON(1);
 436                return 0;
 437
 438        case OSD_ERR_PRI_RESOURCE:
 439                return PNFS_OSD_ERR_RESOURCE;
 440        case OSD_ERR_PRI_BAD_CRED:
 441                return PNFS_OSD_ERR_BAD_CRED;
 442        case OSD_ERR_PRI_NO_ACCESS:
 443                return PNFS_OSD_ERR_NO_ACCESS;
 444        case OSD_ERR_PRI_UNREACHABLE:
 445                return PNFS_OSD_ERR_UNREACHABLE;
 446        case OSD_ERR_PRI_NOT_FOUND:
 447                return PNFS_OSD_ERR_NOT_FOUND;
 448        case OSD_ERR_PRI_NO_SPACE:
 449                return PNFS_OSD_ERR_NO_SPACE;
 450        default:
 451                WARN_ON(1);
 452                /* fallthrough */
 453        case OSD_ERR_PRI_EIO:
 454                return PNFS_OSD_ERR_EIO;
 455        }
 456}
 457
 458static void _clear_bio(struct bio *bio)
 459{
 460        struct bio_vec *bv;
 461        unsigned i;
 462
 463        __bio_for_each_segment(bv, bio, i, 0) {
 464                unsigned this_count = bv->bv_len;
 465
 466                if (likely(PAGE_SIZE == this_count))
 467                        clear_highpage(bv->bv_page);
 468                else
 469                        zero_user(bv->bv_page, bv->bv_offset, this_count);
 470        }
 471}
 472
 473static int _io_check(struct objio_state *ios, bool is_write)
 474{
 475        enum osd_err_priority oep = OSD_ERR_PRI_NO_ERROR;
 476        int lin_ret = 0;
 477        int i;
 478
 479        for (i = 0; i <  ios->numdevs; i++) {
 480                struct osd_sense_info osi;
 481                struct osd_request *or = ios->per_dev[i].or;
 482                unsigned dev;
 483                int ret;
 484
 485                if (!or)
 486                        continue;
 487
 488                ret = osd_req_decode_sense(or, &osi);
 489                if (likely(!ret))
 490                        continue;
 491
 492                if (OSD_ERR_PRI_CLEAR_PAGES == osi.osd_err_pri) {
 493                        /* start read offset passed endof file */
 494                        BUG_ON(is_write);
 495                        _clear_bio(ios->per_dev[i].bio);
 496                        dprintk("%s: start read offset passed end of file "
 497                                "offset=0x%llx, length=0x%lx\n", __func__,
 498                                _LLU(ios->per_dev[i].offset),
 499                                ios->per_dev[i].length);
 500
 501                        continue; /* we recovered */
 502                }
 503                dev = ios->per_dev[i].dev;
 504                objlayout_io_set_result(&ios->ol_state, dev,
 505                                        &ios->layout->comps[dev].oc_object_id,
 506                                        osd_pri_2_pnfs_err(osi.osd_err_pri),
 507                                        ios->per_dev[i].offset,
 508                                        ios->per_dev[i].length,
 509                                        is_write);
 510
 511                if (osi.osd_err_pri >= oep) {
 512                        oep = osi.osd_err_pri;
 513                        lin_ret = ret;
 514                }
 515        }
 516
 517        return lin_ret;
 518}
 519
 520/*
 521 * Common IO state helpers.
 522 */
 523static void _io_free(struct objio_state *ios)
 524{
 525        unsigned i;
 526
 527        for (i = 0; i < ios->numdevs; i++) {
 528                struct _objio_per_comp *per_dev = &ios->per_dev[i];
 529
 530                if (per_dev->or) {
 531                        osd_end_request(per_dev->or);
 532                        per_dev->or = NULL;
 533                }
 534
 535                if (per_dev->bio) {
 536                        bio_put(per_dev->bio);
 537                        per_dev->bio = NULL;
 538                }
 539        }
 540}
 541
 542struct osd_dev *_io_od(struct objio_state *ios, unsigned dev)
 543{
 544        unsigned min_dev = ios->layout->comps_index;
 545        unsigned max_dev = min_dev + ios->layout->num_comps;
 546
 547        BUG_ON(dev < min_dev || max_dev <= dev);
 548        return ios->layout->ods[dev - min_dev]->od;
 549}
 550
 551struct _striping_info {
 552        u64 obj_offset;
 553        u64 group_length;
 554        unsigned dev;
 555        unsigned unit_off;
 556};
 557
 558static void _calc_stripe_info(struct objio_state *ios, u64 file_offset,
 559                              struct _striping_info *si)
 560{
 561        u32     stripe_unit = ios->layout->stripe_unit;
 562        u32     group_width = ios->layout->group_width;
 563        u64     group_depth = ios->layout->group_depth;
 564        u32     U = stripe_unit * group_width;
 565
 566        u64     T = U * group_depth;
 567        u64     S = T * ios->layout->group_count;
 568        u64     M = div64_u64(file_offset, S);
 569
 570        /*
 571        G = (L - (M * S)) / T
 572        H = (L - (M * S)) % T
 573        */
 574        u64     LmodU = file_offset - M * S;
 575        u32     G = div64_u64(LmodU, T);
 576        u64     H = LmodU - G * T;
 577
 578        u32     N = div_u64(H, U);
 579
 580        div_u64_rem(file_offset, stripe_unit, &si->unit_off);
 581        si->obj_offset = si->unit_off + (N * stripe_unit) +
 582                                  (M * group_depth * stripe_unit);
 583
 584        /* "H - (N * U)" is just "H % U" so it's bound to u32 */
 585        si->dev = (u32)(H - (N * U)) / stripe_unit + G * group_width;
 586        si->dev *= ios->layout->mirrors_p1;
 587
 588        si->group_length = T - H;
 589}
 590
 591static int _add_stripe_unit(struct objio_state *ios,  unsigned *cur_pg,
 592                unsigned pgbase, struct _objio_per_comp *per_dev, int cur_len,
 593                gfp_t gfp_flags)
 594{
 595        unsigned pg = *cur_pg;
 596        struct request_queue *q =
 597                        osd_request_queue(_io_od(ios, per_dev->dev));
 598
 599        per_dev->length += cur_len;
 600
 601        if (per_dev->bio == NULL) {
 602                unsigned stripes = ios->layout->num_comps /
 603                                                     ios->layout->mirrors_p1;
 604                unsigned pages_in_stripe = stripes *
 605                                      (ios->layout->stripe_unit / PAGE_SIZE);
 606                unsigned bio_size = (ios->ol_state.nr_pages + pages_in_stripe) /
 607                                    stripes;
 608
 609                if (BIO_MAX_PAGES_KMALLOC < bio_size)
 610                        bio_size = BIO_MAX_PAGES_KMALLOC;
 611
 612                per_dev->bio = bio_kmalloc(gfp_flags, bio_size);
 613                if (unlikely(!per_dev->bio)) {
 614                        dprintk("Faild to allocate BIO size=%u\n", bio_size);
 615                        return -ENOMEM;
 616                }
 617        }
 618
 619        while (cur_len > 0) {
 620                unsigned pglen = min_t(unsigned, PAGE_SIZE - pgbase, cur_len);
 621                unsigned added_len;
 622
 623                BUG_ON(ios->ol_state.nr_pages <= pg);
 624                cur_len -= pglen;
 625
 626                added_len = bio_add_pc_page(q, per_dev->bio,
 627                                        ios->ol_state.pages[pg], pglen, pgbase);
 628                if (unlikely(pglen != added_len))
 629                        return -ENOMEM;
 630                pgbase = 0;
 631                ++pg;
 632        }
 633        BUG_ON(cur_len);
 634
 635        *cur_pg = pg;
 636        return 0;
 637}
 638
 639static int _prepare_one_group(struct objio_state *ios, u64 length,
 640                              struct _striping_info *si, unsigned *last_pg,
 641                              gfp_t gfp_flags)
 642{
 643        unsigned stripe_unit = ios->layout->stripe_unit;
 644        unsigned mirrors_p1 = ios->layout->mirrors_p1;
 645        unsigned devs_in_group = ios->layout->group_width * mirrors_p1;
 646        unsigned dev = si->dev;
 647        unsigned first_dev = dev - (dev % devs_in_group);
 648        unsigned max_comp = ios->numdevs ? ios->numdevs - mirrors_p1 : 0;
 649        unsigned cur_pg = *last_pg;
 650        int ret = 0;
 651
 652        while (length) {
 653                struct _objio_per_comp *per_dev = &ios->per_dev[dev];
 654                unsigned cur_len, page_off = 0;
 655
 656                if (!per_dev->length) {
 657                        per_dev->dev = dev;
 658                        if (dev < si->dev) {
 659                                per_dev->offset = si->obj_offset + stripe_unit -
 660                                                                   si->unit_off;
 661                                cur_len = stripe_unit;
 662                        } else if (dev == si->dev) {
 663                                per_dev->offset = si->obj_offset;
 664                                cur_len = stripe_unit - si->unit_off;
 665                                page_off = si->unit_off & ~PAGE_MASK;
 666                                BUG_ON(page_off &&
 667                                      (page_off != ios->ol_state.pgbase));
 668                        } else { /* dev > si->dev */
 669                                per_dev->offset = si->obj_offset - si->unit_off;
 670                                cur_len = stripe_unit;
 671                        }
 672
 673                        if (max_comp < dev)
 674                                max_comp = dev;
 675                } else {
 676                        cur_len = stripe_unit;
 677                }
 678                if (cur_len >= length)
 679                        cur_len = length;
 680
 681                ret = _add_stripe_unit(ios, &cur_pg, page_off , per_dev,
 682                                       cur_len, gfp_flags);
 683                if (unlikely(ret))
 684                        goto out;
 685
 686                dev += mirrors_p1;
 687                dev = (dev % devs_in_group) + first_dev;
 688
 689                length -= cur_len;
 690                ios->length += cur_len;
 691        }
 692out:
 693        ios->numdevs = max_comp + mirrors_p1;
 694        *last_pg = cur_pg;
 695        return ret;
 696}
 697
 698static int _io_rw_pagelist(struct objio_state *ios, gfp_t gfp_flags)
 699{
 700        u64 length = ios->ol_state.count;
 701        u64 offset = ios->ol_state.offset;
 702        struct _striping_info si;
 703        unsigned last_pg = 0;
 704        int ret = 0;
 705
 706        while (length) {
 707                _calc_stripe_info(ios, offset, &si);
 708
 709                if (length < si.group_length)
 710                        si.group_length = length;
 711
 712                ret = _prepare_one_group(ios, si.group_length, &si, &last_pg, gfp_flags);
 713                if (unlikely(ret))
 714                        goto out;
 715
 716                offset += si.group_length;
 717                length -= si.group_length;
 718        }
 719
 720out:
 721        if (!ios->length)
 722                return ret;
 723
 724        return 0;
 725}
 726
 727static ssize_t _sync_done(struct objio_state *ios)
 728{
 729        struct completion *waiting = ios->private;
 730
 731        complete(waiting);
 732        return 0;
 733}
 734
 735static void _last_io(struct kref *kref)
 736{
 737        struct objio_state *ios = container_of(kref, struct objio_state, kref);
 738
 739        ios->done(ios);
 740}
 741
 742static void _done_io(struct osd_request *or, void *p)
 743{
 744        struct objio_state *ios = p;
 745
 746        kref_put(&ios->kref, _last_io);
 747}
 748
 749static ssize_t _io_exec(struct objio_state *ios)
 750{
 751        DECLARE_COMPLETION_ONSTACK(wait);
 752        ssize_t status = 0; /* sync status */
 753        unsigned i;
 754        objio_done_fn saved_done_fn = ios->done;
 755        bool sync = ios->ol_state.sync;
 756
 757        if (sync) {
 758                ios->done = _sync_done;
 759                ios->private = &wait;
 760        }
 761
 762        kref_init(&ios->kref);
 763
 764        for (i = 0; i < ios->numdevs; i++) {
 765                struct osd_request *or = ios->per_dev[i].or;
 766
 767                if (!or)
 768                        continue;
 769
 770                kref_get(&ios->kref);
 771                osd_execute_request_async(or, _done_io, ios);
 772        }
 773
 774        kref_put(&ios->kref, _last_io);
 775
 776        if (sync) {
 777                wait_for_completion(&wait);
 778                status = saved_done_fn(ios);
 779        }
 780
 781        return status;
 782}
 783
 784/*
 785 * read
 786 */
 787static ssize_t _read_done(struct objio_state *ios)
 788{
 789        ssize_t status;
 790        int ret = _io_check(ios, false);
 791
 792        _io_free(ios);
 793
 794        if (likely(!ret))
 795                status = ios->length;
 796        else
 797                status = ret;
 798
 799        objlayout_read_done(&ios->ol_state, status, ios->ol_state.sync);
 800        return status;
 801}
 802
 803static int _read_mirrors(struct objio_state *ios, unsigned cur_comp)
 804{
 805        struct osd_request *or = NULL;
 806        struct _objio_per_comp *per_dev = &ios->per_dev[cur_comp];
 807        unsigned dev = per_dev->dev;
 808        struct pnfs_osd_object_cred *cred =
 809                        &ios->layout->comps[dev];
 810        struct osd_obj_id obj = {
 811                .partition = cred->oc_object_id.oid_partition_id,
 812                .id = cred->oc_object_id.oid_object_id,
 813        };
 814        int ret;
 815
 816        or = osd_start_request(_io_od(ios, dev), GFP_KERNEL);
 817        if (unlikely(!or)) {
 818                ret = -ENOMEM;
 819                goto err;
 820        }
 821        per_dev->or = or;
 822
 823        osd_req_read(or, &obj, per_dev->offset, per_dev->bio, per_dev->length);
 824
 825        ret = osd_finalize_request(or, 0, cred->oc_cap.cred, NULL);
 826        if (ret) {
 827                dprintk("%s: Faild to osd_finalize_request() => %d\n",
 828                        __func__, ret);
 829                goto err;
 830        }
 831
 832        dprintk("%s:[%d] dev=%d obj=0x%llx start=0x%llx length=0x%lx\n",
 833                __func__, cur_comp, dev, obj.id, _LLU(per_dev->offset),
 834                per_dev->length);
 835
 836err:
 837        return ret;
 838}
 839
 840static ssize_t _read_exec(struct objio_state *ios)
 841{
 842        unsigned i;
 843        int ret;
 844
 845        for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
 846                if (!ios->per_dev[i].length)
 847                        continue;
 848                ret = _read_mirrors(ios, i);
 849                if (unlikely(ret))
 850                        goto err;
 851        }
 852
 853        ios->done = _read_done;
 854        return _io_exec(ios); /* In sync mode exec returns the io status */
 855
 856err:
 857        _io_free(ios);
 858        return ret;
 859}
 860
 861ssize_t objio_read_pagelist(struct objlayout_io_state *ol_state)
 862{
 863        struct objio_state *ios = container_of(ol_state, struct objio_state,
 864                                               ol_state);
 865        int ret;
 866
 867        ret = _io_rw_pagelist(ios, GFP_KERNEL);
 868        if (unlikely(ret))
 869                return ret;
 870
 871        return _read_exec(ios);
 872}
 873
 874/*
 875 * write
 876 */
 877static ssize_t _write_done(struct objio_state *ios)
 878{
 879        ssize_t status;
 880        int ret = _io_check(ios, true);
 881
 882        _io_free(ios);
 883
 884        if (likely(!ret)) {
 885                /* FIXME: should be based on the OSD's persistence model
 886                 * See OSD2r05 Section 4.13 Data persistence model */
 887                ios->ol_state.committed = NFS_FILE_SYNC;
 888                status = ios->length;
 889        } else {
 890                status = ret;
 891        }
 892
 893        objlayout_write_done(&ios->ol_state, status, ios->ol_state.sync);
 894        return status;
 895}
 896
 897static int _write_mirrors(struct objio_state *ios, unsigned cur_comp)
 898{
 899        struct _objio_per_comp *master_dev = &ios->per_dev[cur_comp];
 900        unsigned dev = ios->per_dev[cur_comp].dev;
 901        unsigned last_comp = cur_comp + ios->layout->mirrors_p1;
 902        int ret;
 903
 904        for (; cur_comp < last_comp; ++cur_comp, ++dev) {
 905                struct osd_request *or = NULL;
 906                struct pnfs_osd_object_cred *cred =
 907                                        &ios->layout->comps[dev];
 908                struct osd_obj_id obj = {
 909                        .partition = cred->oc_object_id.oid_partition_id,
 910                        .id = cred->oc_object_id.oid_object_id,
 911                };
 912                struct _objio_per_comp *per_dev = &ios->per_dev[cur_comp];
 913                struct bio *bio;
 914
 915                or = osd_start_request(_io_od(ios, dev), GFP_NOFS);
 916                if (unlikely(!or)) {
 917                        ret = -ENOMEM;
 918                        goto err;
 919                }
 920                per_dev->or = or;
 921
 922                if (per_dev != master_dev) {
 923                        bio = bio_kmalloc(GFP_NOFS,
 924                                          master_dev->bio->bi_max_vecs);
 925                        if (unlikely(!bio)) {
 926                                dprintk("Faild to allocate BIO size=%u\n",
 927                                        master_dev->bio->bi_max_vecs);
 928                                ret = -ENOMEM;
 929                                goto err;
 930                        }
 931
 932                        __bio_clone(bio, master_dev->bio);
 933                        bio->bi_bdev = NULL;
 934                        bio->bi_next = NULL;
 935                        per_dev->bio = bio;
 936                        per_dev->dev = dev;
 937                        per_dev->length = master_dev->length;
 938                        per_dev->offset =  master_dev->offset;
 939                } else {
 940                        bio = master_dev->bio;
 941                        bio->bi_rw |= REQ_WRITE;
 942                }
 943
 944                osd_req_write(or, &obj, per_dev->offset, bio, per_dev->length);
 945
 946                ret = osd_finalize_request(or, 0, cred->oc_cap.cred, NULL);
 947                if (ret) {
 948                        dprintk("%s: Faild to osd_finalize_request() => %d\n",
 949                                __func__, ret);
 950                        goto err;
 951                }
 952
 953                dprintk("%s:[%d] dev=%d obj=0x%llx start=0x%llx length=0x%lx\n",
 954                        __func__, cur_comp, dev, obj.id, _LLU(per_dev->offset),
 955                        per_dev->length);
 956        }
 957
 958err:
 959        return ret;
 960}
 961
 962static ssize_t _write_exec(struct objio_state *ios)
 963{
 964        unsigned i;
 965        int ret;
 966
 967        for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
 968                if (!ios->per_dev[i].length)
 969                        continue;
 970                ret = _write_mirrors(ios, i);
 971                if (unlikely(ret))
 972                        goto err;
 973        }
 974
 975        ios->done = _write_done;
 976        return _io_exec(ios); /* In sync mode exec returns the io->status */
 977
 978err:
 979        _io_free(ios);
 980        return ret;
 981}
 982
 983ssize_t objio_write_pagelist(struct objlayout_io_state *ol_state, bool stable)
 984{
 985        struct objio_state *ios = container_of(ol_state, struct objio_state,
 986                                               ol_state);
 987        int ret;
 988
 989        /* TODO: ios->stable = stable; */
 990        ret = _io_rw_pagelist(ios, GFP_NOFS);
 991        if (unlikely(ret))
 992                return ret;
 993
 994        return _write_exec(ios);
 995}
 996
 997static bool objio_pg_test(struct nfs_pageio_descriptor *pgio,
 998                          struct nfs_page *prev, struct nfs_page *req)
 999{
1000        if (!pnfs_generic_pg_test(pgio, prev, req))
1001                return false;
1002
1003        if (pgio->pg_lseg == NULL)
1004                return true;
1005
1006        return pgio->pg_count + req->wb_bytes <=
1007                        OBJIO_LSEG(pgio->pg_lseg)->max_io_size;
1008}
1009
1010static struct pnfs_layoutdriver_type objlayout_type = {
1011        .id = LAYOUT_OSD2_OBJECTS,
1012        .name = "LAYOUT_OSD2_OBJECTS",
1013        .flags                   = PNFS_LAYOUTRET_ON_SETATTR,
1014
1015        .alloc_layout_hdr        = objlayout_alloc_layout_hdr,
1016        .free_layout_hdr         = objlayout_free_layout_hdr,
1017
1018        .alloc_lseg              = objlayout_alloc_lseg,
1019        .free_lseg               = objlayout_free_lseg,
1020
1021        .read_pagelist           = objlayout_read_pagelist,
1022        .write_pagelist          = objlayout_write_pagelist,
1023        .pg_test                 = objio_pg_test,
1024
1025        .free_deviceid_node      = objio_free_deviceid_node,
1026
1027        .encode_layoutcommit     = objlayout_encode_layoutcommit,
1028        .encode_layoutreturn     = objlayout_encode_layoutreturn,
1029};
1030
1031MODULE_DESCRIPTION("pNFS Layout Driver for OSD2 objects");
1032MODULE_AUTHOR("Benny Halevy <bhalevy@panasas.com>");
1033MODULE_LICENSE("GPL");
1034
1035static int __init
1036objlayout_init(void)
1037{
1038        int ret = pnfs_register_layoutdriver(&objlayout_type);
1039
1040        if (ret)
1041                printk(KERN_INFO
1042                        "%s: Registering OSD pNFS Layout Driver failed: error=%d\n",
1043                        __func__, ret);
1044        else
1045                printk(KERN_INFO "%s: Registered OSD pNFS Layout Driver\n",
1046                        __func__);
1047        return ret;
1048}
1049
1050static void __exit
1051objlayout_exit(void)
1052{
1053        pnfs_unregister_layoutdriver(&objlayout_type);
1054        printk(KERN_INFO "%s: Unregistered OSD pNFS Layout Driver\n",
1055               __func__);
1056}
1057
1058module_init(objlayout_init);
1059module_exit(objlayout_exit);
1060