linux/net/sunrpc/xprtrdma/svc_rdma_pcl.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2020 Oracle. All rights reserved.
   4 */
   5
   6#include <linux/sunrpc/svc_rdma.h>
   7#include <linux/sunrpc/rpc_rdma.h>
   8
   9#include "xprt_rdma.h"
  10#include <trace/events/rpcrdma.h>
  11
  12/**
  13 * pcl_free - Release all memory associated with a parsed chunk list
  14 * @pcl: parsed chunk list
  15 *
  16 */
  17void pcl_free(struct svc_rdma_pcl *pcl)
  18{
  19        while (!list_empty(&pcl->cl_chunks)) {
  20                struct svc_rdma_chunk *chunk;
  21
  22                chunk = pcl_first_chunk(pcl);
  23                list_del(&chunk->ch_list);
  24                kfree(chunk);
  25        }
  26}
  27
  28static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
  29{
  30        struct svc_rdma_chunk *chunk;
  31
  32        chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
  33        if (!chunk)
  34                return NULL;
  35
  36        chunk->ch_position = position;
  37        chunk->ch_length = 0;
  38        chunk->ch_payload_length = 0;
  39        chunk->ch_segcount = 0;
  40        return chunk;
  41}
  42
  43static struct svc_rdma_chunk *
  44pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
  45{
  46        struct svc_rdma_chunk *pos;
  47
  48        pcl_for_each_chunk(pos, pcl) {
  49                if (pos->ch_position == position)
  50                        return pos;
  51        }
  52        return NULL;
  53}
  54
  55static void pcl_insert_position(struct svc_rdma_pcl *pcl,
  56                                struct svc_rdma_chunk *chunk)
  57{
  58        struct svc_rdma_chunk *pos;
  59
  60        pcl_for_each_chunk(pos, pcl) {
  61                if (pos->ch_position > chunk->ch_position)
  62                        break;
  63        }
  64        __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
  65        pcl->cl_count++;
  66}
  67
  68static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
  69                                 struct svc_rdma_chunk *chunk,
  70                                 u32 handle, u32 length, u64 offset)
  71{
  72        struct svc_rdma_segment *segment;
  73
  74        segment = &chunk->ch_segments[chunk->ch_segcount];
  75        segment->rs_handle = handle;
  76        segment->rs_length = length;
  77        segment->rs_offset = offset;
  78
  79        trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
  80
  81        chunk->ch_length += length;
  82        chunk->ch_segcount++;
  83}
  84
  85/**
  86 * pcl_alloc_call - Construct a parsed chunk list for the Call body
  87 * @rctxt: Ingress receive context
  88 * @p: Start of an un-decoded Read list
  89 *
  90 * Assumptions:
  91 * - The incoming Read list has already been sanity checked.
  92 * - cl_count is already set to the number of segments in
  93 *   the un-decoded list.
  94 * - The list might not be in order by position.
  95 *
  96 * Return values:
  97 *       %true: Parsed chunk list was successfully constructed, and
  98 *              cl_count is updated to be the number of chunks (ie.
  99 *              unique positions) in the Read list.
 100 *      %false: Memory allocation failed.
 101 */
 102bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
 103{
 104        struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
 105        unsigned int i, segcount = pcl->cl_count;
 106
 107        pcl->cl_count = 0;
 108        for (i = 0; i < segcount; i++) {
 109                struct svc_rdma_chunk *chunk;
 110                u32 position, handle, length;
 111                u64 offset;
 112
 113                p++;    /* skip the list discriminator */
 114                p = xdr_decode_read_segment(p, &position, &handle,
 115                                            &length, &offset);
 116                if (position != 0)
 117                        continue;
 118
 119                if (pcl_is_empty(pcl)) {
 120                        chunk = pcl_alloc_chunk(segcount, position);
 121                        if (!chunk)
 122                                return false;
 123                        pcl_insert_position(pcl, chunk);
 124                } else {
 125                        chunk = list_first_entry(&pcl->cl_chunks,
 126                                                 struct svc_rdma_chunk,
 127                                                 ch_list);
 128                }
 129
 130                pcl_set_read_segment(rctxt, chunk, handle, length, offset);
 131        }
 132
 133        return true;
 134}
 135
 136/**
 137 * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
 138 * @rctxt: Ingress receive context
 139 * @p: Start of an un-decoded Read list
 140 *
 141 * Assumptions:
 142 * - The incoming Read list has already been sanity checked.
 143 * - cl_count is already set to the number of segments in
 144 *   the un-decoded list.
 145 * - The list might not be in order by position.
 146 *
 147 * Return values:
 148 *       %true: Parsed chunk list was successfully constructed, and
 149 *              cl_count is updated to be the number of chunks (ie.
 150 *              unique position values) in the Read list.
 151 *      %false: Memory allocation failed.
 152 *
 153 * TODO:
 154 * - Check for chunk range overlaps
 155 */
 156bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
 157{
 158        struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
 159        unsigned int i, segcount = pcl->cl_count;
 160
 161        pcl->cl_count = 0;
 162        for (i = 0; i < segcount; i++) {
 163                struct svc_rdma_chunk *chunk;
 164                u32 position, handle, length;
 165                u64 offset;
 166
 167                p++;    /* skip the list discriminator */
 168                p = xdr_decode_read_segment(p, &position, &handle,
 169                                            &length, &offset);
 170                if (position == 0)
 171                        continue;
 172
 173                chunk = pcl_lookup_position(pcl, position);
 174                if (!chunk) {
 175                        chunk = pcl_alloc_chunk(segcount, position);
 176                        if (!chunk)
 177                                return false;
 178                        pcl_insert_position(pcl, chunk);
 179                }
 180
 181                pcl_set_read_segment(rctxt, chunk, handle, length, offset);
 182        }
 183
 184        return true;
 185}
 186
 187/**
 188 * pcl_alloc_write - Construct a parsed chunk list from a Write list
 189 * @rctxt: Ingress receive context
 190 * @pcl: Parsed chunk list to populate
 191 * @p: Start of an un-decoded Write list
 192 *
 193 * Assumptions:
 194 * - The incoming Write list has already been sanity checked, and
 195 * - cl_count is set to the number of chunks in the un-decoded list.
 196 *
 197 * Return values:
 198 *       %true: Parsed chunk list was successfully constructed.
 199 *      %false: Memory allocation failed.
 200 */
 201bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
 202                     struct svc_rdma_pcl *pcl, __be32 *p)
 203{
 204        struct svc_rdma_segment *segment;
 205        struct svc_rdma_chunk *chunk;
 206        unsigned int i, j;
 207        u32 segcount;
 208
 209        for (i = 0; i < pcl->cl_count; i++) {
 210                p++;    /* skip the list discriminator */
 211                segcount = be32_to_cpup(p++);
 212
 213                chunk = pcl_alloc_chunk(segcount, 0);
 214                if (!chunk)
 215                        return false;
 216                list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
 217
 218                for (j = 0; j < segcount; j++) {
 219                        segment = &chunk->ch_segments[j];
 220                        p = xdr_decode_rdma_segment(p, &segment->rs_handle,
 221                                                    &segment->rs_length,
 222                                                    &segment->rs_offset);
 223                        trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
 224
 225                        chunk->ch_length += segment->rs_length;
 226                        chunk->ch_segcount++;
 227                }
 228        }
 229        return true;
 230}
 231
 232static int pcl_process_region(const struct xdr_buf *xdr,
 233                              unsigned int offset, unsigned int length,
 234                              int (*actor)(const struct xdr_buf *, void *),
 235                              void *data)
 236{
 237        struct xdr_buf subbuf;
 238
 239        if (!length)
 240                return 0;
 241        if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
 242                return -EMSGSIZE;
 243        return actor(&subbuf, data);
 244}
 245
 246/**
 247 * pcl_process_nonpayloads - Process non-payload regions inside @xdr
 248 * @pcl: Chunk list to process
 249 * @xdr: xdr_buf to process
 250 * @actor: Function to invoke on each non-payload region
 251 * @data: Arguments for @actor
 252 *
 253 * This mechanism must ignore not only result payloads that were already
 254 * sent via RDMA Write, but also XDR padding for those payloads that
 255 * the upper layer has added.
 256 *
 257 * Assumptions:
 258 *  The xdr->len and ch_position fields are aligned to 4-byte multiples.
 259 *
 260 * Returns:
 261 *   On success, zero,
 262 *   %-EMSGSIZE on XDR buffer overflow, or
 263 *   The return value of @actor
 264 */
 265int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
 266                            const struct xdr_buf *xdr,
 267                            int (*actor)(const struct xdr_buf *, void *),
 268                            void *data)
 269{
 270        struct svc_rdma_chunk *chunk, *next;
 271        unsigned int start;
 272        int ret;
 273
 274        chunk = pcl_first_chunk(pcl);
 275
 276        /* No result payloads were generated */
 277        if (!chunk || !chunk->ch_payload_length)
 278                return actor(xdr, data);
 279
 280        /* Process the region before the first result payload */
 281        ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
 282        if (ret < 0)
 283                return ret;
 284
 285        /* Process the regions between each middle result payload */
 286        while ((next = pcl_next_chunk(pcl, chunk))) {
 287                if (!next->ch_payload_length)
 288                        break;
 289
 290                start = pcl_chunk_end_offset(chunk);
 291                ret = pcl_process_region(xdr, start, next->ch_position - start,
 292                                         actor, data);
 293                if (ret < 0)
 294                        return ret;
 295
 296                chunk = next;
 297        }
 298
 299        /* Process the region after the last result payload */
 300        start = pcl_chunk_end_offset(chunk);
 301        ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
 302        if (ret < 0)
 303                return ret;
 304
 305        return 0;
 306}
 307