linux/drivers/staging/ramster/r2net.c
<<
>>
Prefs
   1/*
   2 * r2net.c
   3 *
   4 * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
   5 *
   6 * Ramster_r2net provides an interface between zcache and r2net.
   7 *
   8 * FIXME: support more than two nodes
   9 */
  10
  11#include <linux/list.h>
  12#include "cluster/tcp.h"
  13#include "cluster/nodemanager.h"
  14#include "tmem.h"
  15#include "zcache.h"
  16#include "ramster.h"
  17
  18#define RAMSTER_TESTING
  19
  20#define RMSTR_KEY       0x77347734
  21
  22enum {
  23        RMSTR_TMEM_PUT_EPH = 100,
  24        RMSTR_TMEM_PUT_PERS,
  25        RMSTR_TMEM_ASYNC_GET_REQUEST,
  26        RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
  27        RMSTR_TMEM_ASYNC_GET_REPLY,
  28        RMSTR_TMEM_FLUSH,
  29        RMSTR_TMEM_FLOBJ,
  30        RMSTR_TMEM_DESTROY_POOL,
  31};
  32
  33#define RMSTR_R2NET_MAX_LEN \
  34                (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
  35
  36#include "cluster/tcp_internal.h"
  37
  38static struct r2nm_node *r2net_target_node;
  39static int r2net_target_nodenum;
  40
  41int r2net_remote_target_node_set(int node_num)
  42{
  43        int ret = -1;
  44
  45        r2net_target_node = r2nm_get_node_by_num(node_num);
  46        if (r2net_target_node != NULL) {
  47                r2net_target_nodenum = node_num;
  48                r2nm_node_put(r2net_target_node);
  49                ret = 0;
  50        }
  51        return ret;
  52}
  53
  54/* FIXME following buffer should be per-cpu, protected by preempt_disable */
  55static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
  56
  57static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
  58                                u32 len, void *data, void **ret_data)
  59{
  60        char *pdata;
  61        struct tmem_xhandle xh;
  62        int found;
  63        size_t size = RMSTR_R2NET_MAX_LEN;
  64        u16 msgtype = be16_to_cpu(msg->msg_type);
  65        bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
  66        unsigned long flags;
  67
  68        xh = *(struct tmem_xhandle *)msg->buf;
  69        if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
  70                BUG();
  71        pdata = ramster_async_get_buf;
  72        *(struct tmem_xhandle *)pdata = xh;
  73        pdata += sizeof(struct tmem_xhandle);
  74        local_irq_save(flags);
  75        found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index,
  76                                pdata, &size, 1, get_and_free ? 1 : -1);
  77        local_irq_restore(flags);
  78        if (found < 0) {
  79                /* a zero size indicates the get failed */
  80                size = 0;
  81        }
  82        if (size > RMSTR_R2NET_MAX_LEN)
  83                BUG();
  84        *ret_data = pdata - sizeof(struct tmem_xhandle);
  85        /* now make caller (r2net_process_message) handle specially */
  86        r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
  87        return size + sizeof(struct tmem_xhandle);
  88}
  89
  90static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
  91                                u32 len, void *data, void **ret_data)
  92{
  93        char *in = (char *)msg->buf;
  94        int datalen = len - sizeof(struct r2net_msg);
  95        int ret = -1;
  96        struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
  97
  98        in += sizeof(struct tmem_xhandle);
  99        datalen -= sizeof(struct tmem_xhandle);
 100        BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
 101        ret = zcache_localify(xh->pool_id, &xh->oid, xh->index,
 102                                in, datalen, xh->extra);
 103#ifdef RAMSTER_TESTING
 104        if (ret == -EEXIST)
 105                pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
 106#endif
 107        return ret;
 108}
 109
 110int ramster_remote_put_handler(struct r2net_msg *msg,
 111                                u32 len, void *data, void **ret_data)
 112{
 113        struct tmem_xhandle *xh;
 114        char *p = (char *)msg->buf;
 115        int datalen = len - sizeof(struct r2net_msg) -
 116                                sizeof(struct tmem_xhandle);
 117        u16 msgtype = be16_to_cpu(msg->msg_type);
 118        bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
 119        unsigned long flags;
 120        int ret;
 121
 122        xh = (struct tmem_xhandle *)p;
 123        p += sizeof(struct tmem_xhandle);
 124        zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
 125        local_irq_save(flags);
 126        ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index,
 127                                p, datalen, 1, ephemeral ? 1 : -1);
 128        local_irq_restore(flags);
 129        return ret;
 130}
 131
 132int ramster_remote_flush_handler(struct r2net_msg *msg,
 133                                u32 len, void *data, void **ret_data)
 134{
 135        struct tmem_xhandle *xh;
 136        char *p = (char *)msg->buf;
 137
 138        xh = (struct tmem_xhandle *)p;
 139        p += sizeof(struct tmem_xhandle);
 140        (void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index);
 141        return 0;
 142}
 143
 144int ramster_remote_flobj_handler(struct r2net_msg *msg,
 145                                u32 len, void *data, void **ret_data)
 146{
 147        struct tmem_xhandle *xh;
 148        char *p = (char *)msg->buf;
 149
 150        xh = (struct tmem_xhandle *)p;
 151        p += sizeof(struct tmem_xhandle);
 152        (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
 153        return 0;
 154}
 155
 156int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
 157                                size_t expect_size, uint8_t expect_cksum,
 158                                void *extra)
 159{
 160        int ret = -1, status;
 161        struct r2nm_node *node = NULL;
 162        struct kvec vec[1];
 163        size_t veclen = 1;
 164        u32 msg_type;
 165
 166        node = r2nm_get_node_by_num(remotenode);
 167        if (node == NULL)
 168                goto out;
 169        xh->client_id = r2nm_this_node(); /* which node is getting */
 170        xh->xh_data_cksum = expect_cksum;
 171        xh->xh_data_size = expect_size;
 172        xh->extra = extra;
 173        vec[0].iov_len = sizeof(*xh);
 174        vec[0].iov_base = xh;
 175        if (free)
 176                msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
 177        else
 178                msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
 179        ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
 180                                        vec, veclen, remotenode, &status);
 181        r2nm_node_put(node);
 182        if (ret < 0) {
 183                /* FIXME handle bad message possibilities here? */
 184                pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
 185        }
 186        ret = status;
 187out:
 188        return ret;
 189}
 190
 191#ifdef RAMSTER_TESTING
 192/* leave me here to see if it catches a weird crash */
 193static void ramster_check_irq_counts(void)
 194{
 195        static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
 196        int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
 197
 198        cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
 199        if (cur_hardirq_cnt > last_hardirq_cnt) {
 200                last_hardirq_cnt = cur_hardirq_cnt;
 201                if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
 202                        pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
 203                                last_hardirq_cnt);
 204        }
 205        cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
 206        if (cur_softirq_cnt > last_softirq_cnt) {
 207                last_softirq_cnt = cur_softirq_cnt;
 208                if (!(last_softirq_cnt&(last_softirq_cnt-1)))
 209                        pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
 210                                last_softirq_cnt);
 211        }
 212        cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
 213        if (cur_preempt_cnt > last_preempt_cnt) {
 214                last_preempt_cnt = cur_preempt_cnt;
 215                if (!(last_preempt_cnt&(last_preempt_cnt-1)))
 216                        pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
 217                                last_preempt_cnt);
 218        }
 219}
 220#endif
 221
 222int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
 223                                bool ephemeral, int *remotenode)
 224{
 225        int nodenum, ret = -1, status;
 226        struct r2nm_node *node = NULL;
 227        struct kvec vec[2];
 228        size_t veclen = 2;
 229        u32 msg_type;
 230#ifdef RAMSTER_TESTING
 231        struct r2net_node *nn;
 232#endif
 233
 234        BUG_ON(size > RMSTR_R2NET_MAX_LEN);
 235        xh->client_id = r2nm_this_node(); /* which node is putting */
 236        vec[0].iov_len = sizeof(*xh);
 237        vec[0].iov_base = xh;
 238        vec[1].iov_len = size;
 239        vec[1].iov_base = data;
 240        node = r2net_target_node;
 241        if (!node)
 242                goto out;
 243
 244        nodenum = r2net_target_nodenum;
 245
 246        r2nm_node_get(node);
 247
 248#ifdef RAMSTER_TESTING
 249        nn = r2net_nn_from_num(nodenum);
 250        WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid);
 251#endif
 252
 253        if (ephemeral)
 254                msg_type = RMSTR_TMEM_PUT_EPH;
 255        else
 256                msg_type = RMSTR_TMEM_PUT_PERS;
 257#ifdef RAMSTER_TESTING
 258        /* leave me here to see if it catches a weird crash */
 259        ramster_check_irq_counts();
 260#endif
 261
 262        ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
 263                                                nodenum, &status);
 264#ifdef RAMSTER_TESTING
 265        if (ret != 0) {
 266                static unsigned long cnt;
 267                cnt++;
 268                if (!(cnt&(cnt-1)))
 269                        pr_err("ramster_remote_put: message failed, "
 270                                "ret=%d, cnt=%lu\n", ret, cnt);
 271                ret = -1;
 272        }
 273#endif
 274        if (ret < 0)
 275                ret = -1;
 276        else {
 277                ret = status;
 278                *remotenode = nodenum;
 279        }
 280
 281        r2nm_node_put(node);
 282out:
 283        return ret;
 284}
 285
 286int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode)
 287{
 288        int ret = -1, status;
 289        struct r2nm_node *node = NULL;
 290        struct kvec vec[1];
 291        size_t veclen = 1;
 292
 293        node = r2nm_get_node_by_num(remotenode);
 294        BUG_ON(node == NULL);
 295        xh->client_id = r2nm_this_node(); /* which node is flushing */
 296        vec[0].iov_len = sizeof(*xh);
 297        vec[0].iov_base = xh;
 298        BUG_ON(irqs_disabled());
 299        BUG_ON(in_softirq());
 300        ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
 301                                        vec, veclen, remotenode, &status);
 302        r2nm_node_put(node);
 303        return ret;
 304}
 305
 306int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
 307{
 308        int ret = -1, status;
 309        struct r2nm_node *node = NULL;
 310        struct kvec vec[1];
 311        size_t veclen = 1;
 312
 313        node = r2nm_get_node_by_num(remotenode);
 314        BUG_ON(node == NULL);
 315        xh->client_id = r2nm_this_node(); /* which node is flobjing */
 316        vec[0].iov_len = sizeof(*xh);
 317        vec[0].iov_base = xh;
 318        ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
 319                                        vec, veclen, remotenode, &status);
 320        r2nm_node_put(node);
 321        return ret;
 322}
 323
 324/*
 325 * Handler registration
 326 */
 327
 328static LIST_HEAD(r2net_unreg_list);
 329
 330static void r2net_unregister_handlers(void)
 331{
 332        r2net_unregister_handler_list(&r2net_unreg_list);
 333}
 334
 335int r2net_register_handlers(void)
 336{
 337        int status;
 338
 339        status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
 340                                RMSTR_R2NET_MAX_LEN,
 341                                ramster_remote_put_handler,
 342                                NULL, NULL, &r2net_unreg_list);
 343        if (status)
 344                goto bail;
 345
 346        status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
 347                                RMSTR_R2NET_MAX_LEN,
 348                                ramster_remote_put_handler,
 349                                NULL, NULL, &r2net_unreg_list);
 350        if (status)
 351                goto bail;
 352
 353        status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
 354                                RMSTR_R2NET_MAX_LEN,
 355                                ramster_remote_async_get_request_handler,
 356                                NULL, NULL,
 357                                &r2net_unreg_list);
 358        if (status)
 359                goto bail;
 360
 361        status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
 362                                RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
 363                                ramster_remote_async_get_request_handler,
 364                                NULL, NULL,
 365                                &r2net_unreg_list);
 366        if (status)
 367                goto bail;
 368
 369        status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
 370                                RMSTR_R2NET_MAX_LEN,
 371                                ramster_remote_async_get_reply_handler,
 372                                NULL, NULL,
 373                                &r2net_unreg_list);
 374        if (status)
 375                goto bail;
 376
 377        status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
 378                                RMSTR_R2NET_MAX_LEN,
 379                                ramster_remote_flush_handler,
 380                                NULL, NULL,
 381                                &r2net_unreg_list);
 382        if (status)
 383                goto bail;
 384
 385        status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
 386                                RMSTR_R2NET_MAX_LEN,
 387                                ramster_remote_flobj_handler,
 388                                NULL, NULL,
 389                                &r2net_unreg_list);
 390        if (status)
 391                goto bail;
 392
 393        pr_info("ramster: r2net handlers registered\n");
 394
 395bail:
 396        if (status) {
 397                r2net_unregister_handlers();
 398                pr_err("ramster: couldn't register r2net handlers\n");
 399        }
 400        return status;
 401}
 402