linux/drivers/staging/zcache/ramster/ramster.c
<<
>>
Prefs
   1/*
   2 * ramster.c
   3 *
   4 * Copyright (c) 2010-2012, Dan Magenheimer, Oracle Corp.
   5 *
   6 * RAMster implements peer-to-peer transcendent memory, allowing a "cluster" of
   7 * kernels to dynamically pool their RAM so that a RAM-hungry workload on one
   8 * machine can temporarily and transparently utilize RAM on another machine
   9 * which is presumably idle or running a non-RAM-hungry workload.
  10 *
  11 * RAMster combines a clustering and messaging foundation based on the ocfs2
  12 * cluster layer with the in-kernel compression implementation of zcache, and
  13 * adds code to glue them together.  When a page is "put" to RAMster, it is
  14 * compressed and stored locally.  Periodically, a thread will "remotify" these
  15 * pages by sending them via messages to a remote machine.  When the page is
  16 * later needed as indicated by a page fault, a "get" is issued.  If the data
  17 * is local, it is uncompressed and the fault is resolved.  If the data is
  18 * remote, a message is sent to fetch the data and the faulting thread sleeps;
  19 * when the data arrives, the thread awakens, the data is decompressed and
  20 * the fault is resolved.
  21
  22 * As of V5, clusters up to eight nodes are supported; each node can remotify
  23 * pages to one specified node, so clusters can be configured as clients to
  24 * a "memory server".  Some simple policy is in place that will need to be
  25 * refined over time.  Larger clusters and fault-resistant protocols can also
  26 * be added over time.
  27 */
  28
  29#include <linux/module.h>
  30#include <linux/cpu.h>
  31#include <linux/highmem.h>
  32#include <linux/list.h>
  33#include <linux/lzo.h>
  34#include <linux/slab.h>
  35#include <linux/spinlock.h>
  36#include <linux/types.h>
  37#include <linux/atomic.h>
  38#include <linux/frontswap.h>
  39#include "../tmem.h"
  40#include "../zcache.h"
  41#include "../zbud.h"
  42#include "ramster.h"
  43#include "ramster_nodemanager.h"
  44#include "tcp.h"
  45#include "debug.h"
  46
  47#define RAMSTER_TESTING
  48
  49#ifndef CONFIG_SYSFS
  50#error "ramster needs sysfs to define cluster nodes to use"
  51#endif
  52
  53static bool use_cleancache __read_mostly;
  54static bool use_frontswap __read_mostly;
  55static bool use_frontswap_exclusive_gets __read_mostly;
  56
  57/* These must be sysfs not debugfs as they are checked/used by userland!! */
  58static unsigned long ramster_interface_revision __read_mostly =
  59        R2NM_API_VERSION; /* interface revision must match userspace! */
  60static unsigned long ramster_pers_remotify_enable __read_mostly;
  61static unsigned long ramster_eph_remotify_enable __read_mostly;
  62static atomic_t ramster_remote_pers_pages = ATOMIC_INIT(0);
  63#define MANUAL_NODES 8
  64static bool ramster_nodes_manual_up[MANUAL_NODES] __read_mostly;
  65static int ramster_remote_target_nodenum __read_mostly = -1;
  66
  67/* Used by this code. */
  68long ramster_flnodes;
  69/* FIXME frontswap selfshrinking knobs in debugfs? */
  70
  71static LIST_HEAD(ramster_rem_op_list);
  72static DEFINE_SPINLOCK(ramster_rem_op_list_lock);
  73static DEFINE_PER_CPU(struct ramster_preload, ramster_preloads);
  74
  75static DEFINE_PER_CPU(unsigned char *, ramster_remoteputmem1);
  76static DEFINE_PER_CPU(unsigned char *, ramster_remoteputmem2);
  77
  78static struct kmem_cache *ramster_flnode_cache __read_mostly;
  79
  80static struct flushlist_node *ramster_flnode_alloc(struct tmem_pool *pool)
  81{
  82        struct flushlist_node *flnode = NULL;
  83        struct ramster_preload *kp;
  84
  85        kp = &__get_cpu_var(ramster_preloads);
  86        flnode = kp->flnode;
  87        BUG_ON(flnode == NULL);
  88        kp->flnode = NULL;
  89        inc_ramster_flnodes();
  90        return flnode;
  91}
  92
  93/* the "flush list" asynchronously collects pages to remotely flush */
  94#define FLUSH_ENTIRE_OBJECT ((uint32_t)-1)
  95static void ramster_flnode_free(struct flushlist_node *flnode,
  96                                struct tmem_pool *pool)
  97{
  98        dec_ramster_flnodes();
  99        BUG_ON(ramster_flnodes < 0);
 100        kmem_cache_free(ramster_flnode_cache, flnode);
 101}
 102
 103int ramster_do_preload_flnode(struct tmem_pool *pool)
 104{
 105        struct ramster_preload *kp;
 106        struct flushlist_node *flnode;
 107        int ret = -ENOMEM;
 108
 109        BUG_ON(!irqs_disabled());
 110        if (unlikely(ramster_flnode_cache == NULL))
 111                BUG();
 112        kp = &__get_cpu_var(ramster_preloads);
 113        flnode = kmem_cache_alloc(ramster_flnode_cache, GFP_ATOMIC);
 114        if (unlikely(flnode == NULL) && kp->flnode == NULL)
 115                BUG();  /* FIXME handle more gracefully, but how??? */
 116        else if (kp->flnode == NULL)
 117                kp->flnode = flnode;
 118        else
 119                kmem_cache_free(ramster_flnode_cache, flnode);
 120        return ret;
 121}
 122EXPORT_SYMBOL_GPL(ramster_do_preload_flnode);
 123
 124/*
 125 * Called by the message handler after a (still compressed) page has been
 126 * fetched from the remote machine in response to an "is_remote" tmem_get
 127 * or persistent tmem_localify.  For a tmem_get, "extra" is the address of
 128 * the page that is to be filled to successfully resolve the tmem_get; for
 129 * a (persistent) tmem_localify, "extra" is NULL (as the data is placed only
 130 * in the local zcache).  "data" points to "size" bytes of (compressed) data
 131 * passed in the message.  In the case of a persistent remote get, if
 132 * pre-allocation was successful (see ramster_repatriate_preload), the page
 133 * is placed into both local zcache and at "extra".
 134 */
 135int ramster_localify(int pool_id, struct tmem_oid *oidp, uint32_t index,
 136                        char *data, unsigned int size, void *extra)
 137{
 138        int ret = -ENOENT;
 139        unsigned long flags;
 140        struct tmem_pool *pool;
 141        bool eph, delete = false;
 142        void *pampd, *saved_hb;
 143        struct tmem_obj *obj;
 144
 145        pool = zcache_get_pool_by_id(LOCAL_CLIENT, pool_id);
 146        if (unlikely(pool == NULL))
 147                /* pool doesn't exist anymore */
 148                goto out;
 149        eph = is_ephemeral(pool);
 150        local_irq_save(flags);  /* FIXME: maybe only disable softirqs? */
 151        pampd = tmem_localify_get_pampd(pool, oidp, index, &obj, &saved_hb);
 152        if (pampd == NULL) {
 153                /* hmmm... must have been a flush while waiting */
 154#ifdef RAMSTER_TESTING
 155                pr_err("UNTESTED pampd==NULL in ramster_localify\n");
 156#endif
 157                if (eph)
 158                        inc_ramster_remote_eph_pages_unsucc_get();
 159                else
 160                        inc_ramster_remote_pers_pages_unsucc_get();
 161                obj = NULL;
 162                goto finish;
 163        } else if (unlikely(!pampd_is_remote(pampd))) {
 164                /* hmmm... must have been a dup put while waiting */
 165#ifdef RAMSTER_TESTING
 166                pr_err("UNTESTED dup while waiting in ramster_localify\n");
 167#endif
 168                if (eph)
 169                        inc_ramster_remote_eph_pages_unsucc_get();
 170                else
 171                        inc_ramster_remote_pers_pages_unsucc_get();
 172                obj = NULL;
 173                pampd = NULL;
 174                ret = -EEXIST;
 175                goto finish;
 176        } else if (size == 0) {
 177                /* no remote data, delete the local is_remote pampd */
 178                pampd = NULL;
 179                if (eph)
 180                        inc_ramster_remote_eph_pages_unsucc_get();
 181                else
 182                        BUG();
 183                delete = true;
 184                goto finish;
 185        }
 186        if (pampd_is_intransit(pampd)) {
 187                /*
 188                 *  a pampd is marked intransit if it is remote and space has
 189                 *  been allocated for it locally (note, only happens for
 190                 *  persistent pages, in which case the remote copy is freed)
 191                 */
 192                BUG_ON(eph);
 193                pampd = pampd_mask_intransit_and_remote(pampd);
 194                zbud_copy_to_zbud(pampd, data, size);
 195        } else {
 196                /*
 197                 * setting pampd to NULL tells tmem_localify_finish to leave
 198                 * pampd alone... meaning it is left pointing to the
 199                 * remote copy
 200                 */
 201                pampd = NULL;
 202                obj = NULL;
 203        }
 204        /*
 205         * but in all cases, we decompress direct-to-memory to complete
 206         * the remotify and return success
 207         */
 208        BUG_ON(extra == NULL);
 209        zcache_decompress_to_page(data, size, (struct page *)extra);
 210        if (eph)
 211                inc_ramster_remote_eph_pages_succ_get();
 212        else
 213                inc_ramster_remote_pers_pages_succ_get();
 214        ret = 0;
 215finish:
 216        tmem_localify_finish(obj, index, pampd, saved_hb, delete);
 217        zcache_put_pool(pool);
 218        local_irq_restore(flags);
 219out:
 220        return ret;
 221}
 222
 223void ramster_pampd_new_obj(struct tmem_obj *obj)
 224{
 225        obj->extra = NULL;
 226}
 227
 228void ramster_pampd_free_obj(struct tmem_pool *pool, struct tmem_obj *obj,
 229                                bool pool_destroy)
 230{
 231        struct flushlist_node *flnode;
 232
 233        BUG_ON(preemptible());
 234        if (obj->extra == NULL)
 235                return;
 236        if (pool_destroy && is_ephemeral(pool))
 237                /* FIXME don't bother with remote eph data for now */
 238                return;
 239        BUG_ON(!pampd_is_remote(obj->extra));
 240        flnode = ramster_flnode_alloc(pool);
 241        flnode->xh.client_id = pampd_remote_node(obj->extra);
 242        flnode->xh.pool_id = pool->pool_id;
 243        flnode->xh.oid = obj->oid;
 244        flnode->xh.index = FLUSH_ENTIRE_OBJECT;
 245        flnode->rem_op.op = RAMSTER_REMOTIFY_FLUSH_OBJ;
 246        spin_lock(&ramster_rem_op_list_lock);
 247        list_add(&flnode->rem_op.list, &ramster_rem_op_list);
 248        spin_unlock(&ramster_rem_op_list_lock);
 249}
 250
 251/*
 252 * Called on a remote persistent tmem_get to attempt to preallocate
 253 * local storage for the data contained in the remote persistent page.
 254 * If successfully preallocated, returns the pampd, marked as remote and
 255 * in_transit.  Else returns NULL.  Note that the appropriate tmem data
 256 * structure must be locked.
 257 */
 258void *ramster_pampd_repatriate_preload(void *pampd, struct tmem_pool *pool,
 259                                        struct tmem_oid *oidp, uint32_t index,
 260                                        bool *intransit)
 261{
 262        int clen = pampd_remote_size(pampd), c;
 263        void *ret_pampd = NULL;
 264        unsigned long flags;
 265        struct tmem_handle th;
 266
 267        BUG_ON(!pampd_is_remote(pampd));
 268        BUG_ON(is_ephemeral(pool));
 269        if (use_frontswap_exclusive_gets)
 270                /* don't need local storage */
 271                goto out;
 272        if (pampd_is_intransit(pampd)) {
 273                /*
 274                 * to avoid multiple allocations (and maybe a memory leak)
 275                 * don't preallocate if already in the process of being
 276                 * repatriated
 277                 */
 278                *intransit = true;
 279                goto out;
 280        }
 281        *intransit = false;
 282        local_irq_save(flags);
 283        th.client_id = pampd_remote_node(pampd);
 284        th.pool_id = pool->pool_id;
 285        th.oid = *oidp;
 286        th.index = index;
 287        ret_pampd = zcache_pampd_create(NULL, clen, true, false, &th);
 288        if (ret_pampd != NULL) {
 289                /*
 290                 *  a pampd is marked intransit if it is remote and space has
 291                 *  been allocated for it locally (note, only happens for
 292                 *  persistent pages, in which case the remote copy is freed)
 293                 */
 294                ret_pampd = pampd_mark_intransit(ret_pampd);
 295                c = atomic_dec_return(&ramster_remote_pers_pages);
 296                WARN_ON_ONCE(c < 0);
 297        } else {
 298                inc_ramster_pers_pages_remote_nomem();
 299        }
 300        local_irq_restore(flags);
 301out:
 302        return ret_pampd;
 303}
 304
 305/*
 306 * Called on a remote tmem_get to invoke a message to fetch the page.
 307 * Might sleep so no tmem locks can be held.  "extra" is passed
 308 * all the way through the round-trip messaging to ramster_localify.
 309 */
 310int ramster_pampd_repatriate(void *fake_pampd, void *real_pampd,
 311                                struct tmem_pool *pool,
 312                                struct tmem_oid *oid, uint32_t index,
 313                                bool free, void *extra)
 314{
 315        struct tmem_xhandle xh;
 316        int ret;
 317
 318        if (pampd_is_intransit(real_pampd))
 319                /* have local space pre-reserved, so free remote copy */
 320                free = true;
 321        xh = tmem_xhandle_fill(LOCAL_CLIENT, pool, oid, index);
 322        /* unreliable request/response for now */
 323        ret = r2net_remote_async_get(&xh, free,
 324                                        pampd_remote_node(fake_pampd),
 325                                        pampd_remote_size(fake_pampd),
 326                                        pampd_remote_cksum(fake_pampd),
 327                                        extra);
 328        return ret;
 329}
 330
 331bool ramster_pampd_is_remote(void *pampd)
 332{
 333        return pampd_is_remote(pampd);
 334}
 335
 336int ramster_pampd_replace_in_obj(void *new_pampd, struct tmem_obj *obj)
 337{
 338        int ret = -1;
 339
 340        if (new_pampd != NULL) {
 341                if (obj->extra == NULL)
 342                        obj->extra = new_pampd;
 343                /* enforce that all remote pages in an object reside
 344                 * in the same node! */
 345                else if (pampd_remote_node(new_pampd) !=
 346                                pampd_remote_node((void *)(obj->extra)))
 347                        BUG();
 348                ret = 0;
 349        }
 350        return ret;
 351}
 352
 353void *ramster_pampd_free(void *pampd, struct tmem_pool *pool,
 354                              struct tmem_oid *oid, uint32_t index, bool acct)
 355{
 356        bool eph = is_ephemeral(pool);
 357        void *local_pampd = NULL;
 358        int c;
 359
 360        BUG_ON(preemptible());
 361        BUG_ON(!pampd_is_remote(pampd));
 362        WARN_ON(acct == false);
 363        if (oid == NULL) {
 364                /*
 365                 * a NULL oid means to ignore this pampd free
 366                 * as the remote freeing will be handled elsewhere
 367                 */
 368        } else if (eph) {
 369                /* FIXME remote flush optional but probably good idea */
 370        } else if (pampd_is_intransit(pampd)) {
 371                /* did a pers remote get_and_free, so just free local */
 372                local_pampd = pampd_mask_intransit_and_remote(pampd);
 373        } else {
 374                struct flushlist_node *flnode =
 375                        ramster_flnode_alloc(pool);
 376
 377                flnode->xh.client_id = pampd_remote_node(pampd);
 378                flnode->xh.pool_id = pool->pool_id;
 379                flnode->xh.oid = *oid;
 380                flnode->xh.index = index;
 381                flnode->rem_op.op = RAMSTER_REMOTIFY_FLUSH_PAGE;
 382                spin_lock(&ramster_rem_op_list_lock);
 383                list_add(&flnode->rem_op.list, &ramster_rem_op_list);
 384                spin_unlock(&ramster_rem_op_list_lock);
 385                c = atomic_dec_return(&ramster_remote_pers_pages);
 386                WARN_ON_ONCE(c < 0);
 387        }
 388        return local_pampd;
 389}
 390EXPORT_SYMBOL_GPL(ramster_pampd_free);
 391
 392void ramster_count_foreign_pages(bool eph, int count)
 393{
 394        BUG_ON(count != 1 && count != -1);
 395        if (eph) {
 396                if (count > 0) {
 397                        inc_ramster_foreign_eph_pages();
 398                } else {
 399                        dec_ramster_foreign_eph_pages();
 400#ifdef CONFIG_RAMSTER_DEBUG
 401                        WARN_ON_ONCE(ramster_foreign_eph_pages < 0);
 402#endif
 403                }
 404        } else {
 405                if (count > 0) {
 406                        inc_ramster_foreign_pers_pages();
 407                } else {
 408                        dec_ramster_foreign_pers_pages();
 409#ifdef CONFIG_RAMSTER_DEBUG
 410                        WARN_ON_ONCE(ramster_foreign_pers_pages < 0);
 411#endif
 412                }
 413        }
 414}
 415EXPORT_SYMBOL_GPL(ramster_count_foreign_pages);
 416
 417/*
 418 * For now, just push over a few pages every few seconds to
 419 * ensure that it basically works
 420 */
 421static struct workqueue_struct *ramster_remotify_workqueue;
 422static void ramster_remotify_process(struct work_struct *work);
 423static DECLARE_DELAYED_WORK(ramster_remotify_worker,
 424                ramster_remotify_process);
 425
 426static void ramster_remotify_queue_delayed_work(unsigned long delay)
 427{
 428        if (!queue_delayed_work(ramster_remotify_workqueue,
 429                                &ramster_remotify_worker, delay))
 430                pr_err("ramster_remotify: bad workqueue\n");
 431}
 432
 433static void ramster_remote_flush_page(struct flushlist_node *flnode)
 434{
 435        struct tmem_xhandle *xh;
 436        int remotenode, ret;
 437
 438        preempt_disable();
 439        xh = &flnode->xh;
 440        remotenode = flnode->xh.client_id;
 441        ret = r2net_remote_flush(xh, remotenode);
 442        if (ret >= 0)
 443                inc_ramster_remote_pages_flushed();
 444        else
 445                inc_ramster_remote_page_flushes_failed();
 446        preempt_enable_no_resched();
 447        ramster_flnode_free(flnode, NULL);
 448}
 449
 450static void ramster_remote_flush_object(struct flushlist_node *flnode)
 451{
 452        struct tmem_xhandle *xh;
 453        int remotenode, ret;
 454
 455        preempt_disable();
 456        xh = &flnode->xh;
 457        remotenode = flnode->xh.client_id;
 458        ret = r2net_remote_flush_object(xh, remotenode);
 459        if (ret >= 0)
 460                inc_ramster_remote_objects_flushed();
 461        else
 462                inc_ramster_remote_object_flushes_failed();
 463        preempt_enable_no_resched();
 464        ramster_flnode_free(flnode, NULL);
 465}
 466
 467int ramster_remotify_pageframe(bool eph)
 468{
 469        struct tmem_xhandle xh;
 470        unsigned int size;
 471        int remotenode, ret, zbuds;
 472        struct tmem_pool *pool;
 473        unsigned long flags;
 474        unsigned char cksum;
 475        char *p;
 476        int i, j;
 477        unsigned char *tmpmem[2];
 478        struct tmem_handle th[2];
 479        unsigned int zsize[2];
 480
 481        tmpmem[0] = __get_cpu_var(ramster_remoteputmem1);
 482        tmpmem[1] = __get_cpu_var(ramster_remoteputmem2);
 483        local_bh_disable();
 484        zbuds = zbud_make_zombie_lru(&th[0], &tmpmem[0], &zsize[0], eph);
 485        /* now OK to release lock set in caller */
 486        local_bh_enable();
 487        if (zbuds == 0)
 488                goto out;
 489        BUG_ON(zbuds > 2);
 490        for (i = 0; i < zbuds; i++) {
 491                xh.client_id = th[i].client_id;
 492                xh.pool_id = th[i].pool_id;
 493                xh.oid = th[i].oid;
 494                xh.index = th[i].index;
 495                size = zsize[i];
 496                BUG_ON(size == 0 || size > zbud_max_buddy_size());
 497                for (p = tmpmem[i], cksum = 0, j = 0; j < size; j++)
 498                        cksum += *p++;
 499                ret = r2net_remote_put(&xh, tmpmem[i], size, eph, &remotenode);
 500                if (ret != 0) {
 501                /*
 502                 * This is some form of a memory leak... if the remote put
 503                 * fails, there will never be another attempt to remotify
 504                 * this page.  But since we've dropped the zv pointer,
 505                 * the page may have been freed or the data replaced
 506                 * so we can't just "put it back" in the remote op list.
 507                 * Even if we could, not sure where to put it in the list
 508                 * because there may be flushes that must be strictly
 509                 * ordered vs the put.  So leave this as a FIXME for now.
 510                 * But count them so we know if it becomes a problem.
 511                 */
 512                        if (eph)
 513                                inc_ramster_eph_pages_remote_failed();
 514                        else
 515                                inc_ramster_pers_pages_remote_failed();
 516                        break;
 517                } else {
 518                        if (!eph)
 519                                atomic_inc(&ramster_remote_pers_pages);
 520                }
 521                if (eph)
 522                        inc_ramster_eph_pages_remoted();
 523                else
 524                        inc_ramster_pers_pages_remoted();
 525                /*
 526                 * data was successfully remoted so change the local version to
 527                 * point to the remote node where it landed
 528                 */
 529                local_bh_disable();
 530                pool = zcache_get_pool_by_id(LOCAL_CLIENT, xh.pool_id);
 531                local_irq_save(flags);
 532                (void)tmem_replace(pool, &xh.oid, xh.index,
 533                                pampd_make_remote(remotenode, size, cksum));
 534                local_irq_restore(flags);
 535                zcache_put_pool(pool);
 536                local_bh_enable();
 537        }
 538out:
 539        return zbuds;
 540}
 541
 542static void zcache_do_remotify_flushes(void)
 543{
 544        struct ramster_remotify_hdr *rem_op;
 545        union remotify_list_node *u;
 546
 547        while (1) {
 548                spin_lock(&ramster_rem_op_list_lock);
 549                if (list_empty(&ramster_rem_op_list)) {
 550                        spin_unlock(&ramster_rem_op_list_lock);
 551                        goto out;
 552                }
 553                rem_op = list_first_entry(&ramster_rem_op_list,
 554                                struct ramster_remotify_hdr, list);
 555                list_del_init(&rem_op->list);
 556                spin_unlock(&ramster_rem_op_list_lock);
 557                u = (union remotify_list_node *)rem_op;
 558                switch (rem_op->op) {
 559                case RAMSTER_REMOTIFY_FLUSH_PAGE:
 560                        ramster_remote_flush_page((struct flushlist_node *)u);
 561                        break;
 562                case RAMSTER_REMOTIFY_FLUSH_OBJ:
 563                        ramster_remote_flush_object((struct flushlist_node *)u);
 564                        break;
 565                default:
 566                        BUG();
 567                }
 568        }
 569out:
 570        return;
 571}
 572
 573static void ramster_remotify_process(struct work_struct *work)
 574{
 575        static bool remotify_in_progress;
 576        int i;
 577
 578        BUG_ON(irqs_disabled());
 579        if (remotify_in_progress)
 580                goto requeue;
 581        if (ramster_remote_target_nodenum == -1)
 582                goto requeue;
 583        remotify_in_progress = true;
 584        if (use_cleancache && ramster_eph_remotify_enable) {
 585                for (i = 0; i < 100; i++) {
 586                        zcache_do_remotify_flushes();
 587                        (void)ramster_remotify_pageframe(true);
 588                }
 589        }
 590        if (use_frontswap && ramster_pers_remotify_enable) {
 591                for (i = 0; i < 100; i++) {
 592                        zcache_do_remotify_flushes();
 593                        (void)ramster_remotify_pageframe(false);
 594                }
 595        }
 596        remotify_in_progress = false;
 597requeue:
 598        ramster_remotify_queue_delayed_work(HZ);
 599}
 600
 601void ramster_remotify_init(void)
 602{
 603        unsigned long n = 60UL;
 604        ramster_remotify_workqueue =
 605                create_singlethread_workqueue("ramster_remotify");
 606        ramster_remotify_queue_delayed_work(n * HZ);
 607}
 608
 609static ssize_t ramster_manual_node_up_show(struct kobject *kobj,
 610                                struct kobj_attribute *attr, char *buf)
 611{
 612        int i;
 613        char *p = buf;
 614        for (i = 0; i < MANUAL_NODES; i++)
 615                if (ramster_nodes_manual_up[i])
 616                        p += sprintf(p, "%d ", i);
 617        p += sprintf(p, "\n");
 618        return p - buf;
 619}
 620
 621static ssize_t ramster_manual_node_up_store(struct kobject *kobj,
 622                struct kobj_attribute *attr, const char *buf, size_t count)
 623{
 624        int err;
 625        unsigned long node_num;
 626
 627        err = kstrtoul(buf, 10, &node_num);
 628        if (err) {
 629                pr_err("ramster: bad strtoul?\n");
 630                return -EINVAL;
 631        }
 632        if (node_num >= MANUAL_NODES) {
 633                pr_err("ramster: bad node_num=%lu?\n", node_num);
 634                return -EINVAL;
 635        }
 636        if (ramster_nodes_manual_up[node_num]) {
 637                pr_err("ramster: node %d already up, ignoring\n",
 638                                                        (int)node_num);
 639        } else {
 640                ramster_nodes_manual_up[node_num] = true;
 641                r2net_hb_node_up_manual((int)node_num);
 642        }
 643        return count;
 644}
 645
 646static struct kobj_attribute ramster_manual_node_up_attr = {
 647        .attr = { .name = "manual_node_up", .mode = 0644 },
 648        .show = ramster_manual_node_up_show,
 649        .store = ramster_manual_node_up_store,
 650};
 651
 652static ssize_t ramster_remote_target_nodenum_show(struct kobject *kobj,
 653                                struct kobj_attribute *attr, char *buf)
 654{
 655        if (ramster_remote_target_nodenum == -1UL)
 656                return sprintf(buf, "unset\n");
 657        else
 658                return sprintf(buf, "%d\n", ramster_remote_target_nodenum);
 659}
 660
 661static ssize_t ramster_remote_target_nodenum_store(struct kobject *kobj,
 662                struct kobj_attribute *attr, const char *buf, size_t count)
 663{
 664        int err;
 665        unsigned long node_num;
 666
 667        err = kstrtoul(buf, 10, &node_num);
 668        if (err) {
 669                pr_err("ramster: bad strtoul?\n");
 670                return -EINVAL;
 671        } else if (node_num == -1UL) {
 672                pr_err("ramster: disabling all remotification, "
 673                        "data may still reside on remote nodes however\n");
 674                return -EINVAL;
 675        } else if (node_num >= MANUAL_NODES) {
 676                pr_err("ramster: bad node_num=%lu?\n", node_num);
 677                return -EINVAL;
 678        } else if (!ramster_nodes_manual_up[node_num]) {
 679                pr_err("ramster: node %d not up, ignoring setting "
 680                        "of remotification target\n", (int)node_num);
 681        } else if (r2net_remote_target_node_set((int)node_num) >= 0) {
 682                pr_info("ramster: node %d set as remotification target\n",
 683                                (int)node_num);
 684                ramster_remote_target_nodenum = (int)node_num;
 685        } else {
 686                pr_err("ramster: bad num to node node_num=%d?\n",
 687                                (int)node_num);
 688                return -EINVAL;
 689        }
 690        return count;
 691}
 692
 693static struct kobj_attribute ramster_remote_target_nodenum_attr = {
 694        .attr = { .name = "remote_target_nodenum", .mode = 0644 },
 695        .show = ramster_remote_target_nodenum_show,
 696        .store = ramster_remote_target_nodenum_store,
 697};
 698
 699#define RAMSTER_SYSFS_RO(_name) \
 700        static ssize_t ramster_##_name##_show(struct kobject *kobj, \
 701                                struct kobj_attribute *attr, char *buf) \
 702        { \
 703                return sprintf(buf, "%lu\n", ramster_##_name); \
 704        } \
 705        static struct kobj_attribute ramster_##_name##_attr = { \
 706                .attr = { .name = __stringify(_name), .mode = 0444 }, \
 707                .show = ramster_##_name##_show, \
 708        }
 709
 710#define RAMSTER_SYSFS_RW(_name) \
 711        static ssize_t ramster_##_name##_show(struct kobject *kobj, \
 712                                struct kobj_attribute *attr, char *buf) \
 713        { \
 714                return sprintf(buf, "%lu\n", ramster_##_name); \
 715        } \
 716        static ssize_t ramster_##_name##_store(struct kobject *kobj, \
 717                struct kobj_attribute *attr, const char *buf, size_t count) \
 718        { \
 719                int err; \
 720                unsigned long enable; \
 721                err = kstrtoul(buf, 10, &enable); \
 722                if (err) \
 723                        return -EINVAL; \
 724                ramster_##_name = enable; \
 725                return count; \
 726        } \
 727        static struct kobj_attribute ramster_##_name##_attr = { \
 728                .attr = { .name = __stringify(_name), .mode = 0644 }, \
 729                .show = ramster_##_name##_show, \
 730                .store = ramster_##_name##_store, \
 731        }
 732
 733#define RAMSTER_SYSFS_RO_ATOMIC(_name) \
 734        static ssize_t ramster_##_name##_show(struct kobject *kobj, \
 735                                struct kobj_attribute *attr, char *buf) \
 736        { \
 737            return sprintf(buf, "%d\n", atomic_read(&ramster_##_name)); \
 738        } \
 739        static struct kobj_attribute ramster_##_name##_attr = { \
 740                .attr = { .name = __stringify(_name), .mode = 0444 }, \
 741                .show = ramster_##_name##_show, \
 742        }
 743
 744RAMSTER_SYSFS_RO(interface_revision);
 745RAMSTER_SYSFS_RO_ATOMIC(remote_pers_pages);
 746RAMSTER_SYSFS_RW(pers_remotify_enable);
 747RAMSTER_SYSFS_RW(eph_remotify_enable);
 748
 749static struct attribute *ramster_attrs[] = {
 750        &ramster_interface_revision_attr.attr,
 751        &ramster_remote_pers_pages_attr.attr,
 752        &ramster_manual_node_up_attr.attr,
 753        &ramster_remote_target_nodenum_attr.attr,
 754        &ramster_pers_remotify_enable_attr.attr,
 755        &ramster_eph_remotify_enable_attr.attr,
 756        NULL,
 757};
 758
 759static struct attribute_group ramster_attr_group = {
 760        .attrs = ramster_attrs,
 761        .name = "ramster",
 762};
 763
 764/*
 765 * frontswap selfshrinking
 766 */
 767
 768/* In HZ, controls frequency of worker invocation. */
 769static unsigned int selfshrink_interval __read_mostly = 5;
 770/* Enable/disable with sysfs. */
 771static bool frontswap_selfshrinking __read_mostly;
 772
 773static void selfshrink_process(struct work_struct *work);
 774static DECLARE_DELAYED_WORK(selfshrink_worker, selfshrink_process);
 775
 776#ifndef CONFIG_RAMSTER_MODULE
 777/* Enable/disable with kernel boot option. */
 778static bool use_frontswap_selfshrink = true;
 779#endif
 780
 781/*
 782 * The default values for the following parameters were deemed reasonable
 783 * by experimentation, may be workload-dependent, and can all be
 784 * adjusted via sysfs.
 785 */
 786
 787/* Control rate for frontswap shrinking. Higher hysteresis is slower. */
 788static unsigned int frontswap_hysteresis __read_mostly = 20;
 789
 790/*
 791 * Number of selfshrink worker invocations to wait before observing that
 792 * frontswap selfshrinking should commence. Note that selfshrinking does
 793 * not use a separate worker thread.
 794 */
 795static unsigned int frontswap_inertia __read_mostly = 3;
 796
 797/* Countdown to next invocation of frontswap_shrink() */
 798static unsigned long frontswap_inertia_counter;
 799
 800/*
 801 * Invoked by the selfshrink worker thread, uses current number of pages
 802 * in frontswap (frontswap_curr_pages()), previous status, and control
 803 * values (hysteresis and inertia) to determine if frontswap should be
 804 * shrunk and what the new frontswap size should be.  Note that
 805 * frontswap_shrink is essentially a partial swapoff that immediately
 806 * transfers pages from the "swap device" (frontswap) back into kernel
 807 * RAM; despite the name, frontswap "shrinking" is very different from
 808 * the "shrinker" interface used by the kernel MM subsystem to reclaim
 809 * memory.
 810 */
 811static void frontswap_selfshrink(void)
 812{
 813        static unsigned long cur_frontswap_pages;
 814        static unsigned long last_frontswap_pages;
 815        static unsigned long tgt_frontswap_pages;
 816
 817        last_frontswap_pages = cur_frontswap_pages;
 818        cur_frontswap_pages = frontswap_curr_pages();
 819        if (!cur_frontswap_pages ||
 820                        (cur_frontswap_pages > last_frontswap_pages)) {
 821                frontswap_inertia_counter = frontswap_inertia;
 822                return;
 823        }
 824        if (frontswap_inertia_counter && --frontswap_inertia_counter)
 825                return;
 826        if (cur_frontswap_pages <= frontswap_hysteresis)
 827                tgt_frontswap_pages = 0;
 828        else
 829                tgt_frontswap_pages = cur_frontswap_pages -
 830                        (cur_frontswap_pages / frontswap_hysteresis);
 831        frontswap_shrink(tgt_frontswap_pages);
 832}
 833
 834#ifndef CONFIG_RAMSTER_MODULE
 835static int __init ramster_nofrontswap_selfshrink_setup(char *s)
 836{
 837        use_frontswap_selfshrink = false;
 838        return 1;
 839}
 840
 841__setup("noselfshrink", ramster_nofrontswap_selfshrink_setup);
 842#endif
 843
 844static void selfshrink_process(struct work_struct *work)
 845{
 846        if (frontswap_selfshrinking && frontswap_enabled) {
 847                frontswap_selfshrink();
 848                schedule_delayed_work(&selfshrink_worker,
 849                        selfshrink_interval * HZ);
 850        }
 851}
 852
 853void ramster_cpu_up(int cpu)
 854{
 855        unsigned char *p1 = kzalloc(PAGE_SIZE, GFP_KERNEL | __GFP_REPEAT);
 856        unsigned char *p2 = kzalloc(PAGE_SIZE, GFP_KERNEL | __GFP_REPEAT);
 857        BUG_ON(!p1 || !p2);
 858        per_cpu(ramster_remoteputmem1, cpu) = p1;
 859        per_cpu(ramster_remoteputmem2, cpu) = p2;
 860}
 861EXPORT_SYMBOL_GPL(ramster_cpu_up);
 862
 863void ramster_cpu_down(int cpu)
 864{
 865        struct ramster_preload *kp;
 866
 867        kfree(per_cpu(ramster_remoteputmem1, cpu));
 868        per_cpu(ramster_remoteputmem1, cpu) = NULL;
 869        kfree(per_cpu(ramster_remoteputmem2, cpu));
 870        per_cpu(ramster_remoteputmem2, cpu) = NULL;
 871        kp = &per_cpu(ramster_preloads, cpu);
 872        if (kp->flnode) {
 873                kmem_cache_free(ramster_flnode_cache, kp->flnode);
 874                kp->flnode = NULL;
 875        }
 876}
 877EXPORT_SYMBOL_GPL(ramster_cpu_down);
 878
 879void ramster_register_pamops(struct tmem_pamops *pamops)
 880{
 881        pamops->free_obj = ramster_pampd_free_obj;
 882        pamops->new_obj = ramster_pampd_new_obj;
 883        pamops->replace_in_obj = ramster_pampd_replace_in_obj;
 884        pamops->is_remote = ramster_pampd_is_remote;
 885        pamops->repatriate = ramster_pampd_repatriate;
 886        pamops->repatriate_preload = ramster_pampd_repatriate_preload;
 887}
 888EXPORT_SYMBOL_GPL(ramster_register_pamops);
 889
 890void ramster_init(bool cleancache, bool frontswap,
 891                                bool frontswap_exclusive_gets,
 892                                bool frontswap_selfshrink)
 893{
 894        int ret = 0;
 895
 896        if (cleancache)
 897                use_cleancache = true;
 898        if (frontswap)
 899                use_frontswap = true;
 900        if (frontswap_exclusive_gets)
 901                use_frontswap_exclusive_gets = true;
 902        ramster_debugfs_init();
 903        ret = sysfs_create_group(mm_kobj, &ramster_attr_group);
 904        if (ret)
 905                pr_err("ramster: can't create sysfs for ramster\n");
 906        (void)r2net_register_handlers();
 907#ifdef CONFIG_RAMSTER_MODULE
 908        ret = r2nm_init();
 909        if (ret)
 910                pr_err("ramster: can't init r2net\n");
 911        frontswap_selfshrinking = frontswap_selfshrink;
 912#else
 913        frontswap_selfshrinking = use_frontswap_selfshrink;
 914#endif
 915        INIT_LIST_HEAD(&ramster_rem_op_list);
 916        ramster_flnode_cache = kmem_cache_create("ramster_flnode",
 917                                sizeof(struct flushlist_node), 0, 0, NULL);
 918        if (frontswap_selfshrinking) {
 919                pr_info("ramster: Initializing frontswap selfshrink driver.\n");
 920                schedule_delayed_work(&selfshrink_worker,
 921                                        selfshrink_interval * HZ);
 922        }
 923        ramster_remotify_init();
 924}
 925EXPORT_SYMBOL_GPL(ramster_init);
 926