linux/include/linux/ceph/messenger.h
<<
>>
Prefs
   1#ifndef __FS_CEPH_MESSENGER_H
   2#define __FS_CEPH_MESSENGER_H
   3
   4#include <linux/bvec.h>
   5#include <linux/kref.h>
   6#include <linux/mutex.h>
   7#include <linux/net.h>
   8#include <linux/radix-tree.h>
   9#include <linux/uio.h>
  10#include <linux/workqueue.h>
  11#include <net/net_namespace.h>
  12
  13#include <linux/ceph/types.h>
  14#include <linux/ceph/buffer.h>
  15
  16struct ceph_msg;
  17struct ceph_connection;
  18
  19/*
  20 * Ceph defines these callbacks for handling connection events.
  21 */
  22struct ceph_connection_operations {
  23        struct ceph_connection *(*get)(struct ceph_connection *);
  24        void (*put)(struct ceph_connection *);
  25
  26        /* handle an incoming message. */
  27        void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
  28
  29        /* authorize an outgoing connection */
  30        struct ceph_auth_handshake *(*get_authorizer) (
  31                                struct ceph_connection *con,
  32                               int *proto, int force_new);
  33        int (*verify_authorizer_reply) (struct ceph_connection *con);
  34        int (*invalidate_authorizer)(struct ceph_connection *con);
  35
  36        /* there was some error on the socket (disconnect, whatever) */
  37        void (*fault) (struct ceph_connection *con);
  38
  39        /* a remote host as terminated a message exchange session, and messages
  40         * we sent (or they tried to send us) may be lost. */
  41        void (*peer_reset) (struct ceph_connection *con);
  42
  43        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
  44                                        struct ceph_msg_header *hdr,
  45                                        int *skip);
  46
  47        void (*reencode_message) (struct ceph_msg *msg);
  48
  49        int (*sign_message) (struct ceph_msg *msg);
  50        int (*check_message_signature) (struct ceph_msg *msg);
  51};
  52
  53/* use format string %s%d */
  54#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
  55
  56struct ceph_messenger {
  57        struct ceph_entity_inst inst;    /* my name+address */
  58        struct ceph_entity_addr my_enc_addr;
  59
  60        atomic_t stopping;
  61        possible_net_t net;
  62
  63        /*
  64         * the global_seq counts connections i (attempt to) initiate
  65         * in order to disambiguate certain connect race conditions.
  66         */
  67        u32 global_seq;
  68        spinlock_t global_seq_lock;
  69};
  70
  71enum ceph_msg_data_type {
  72        CEPH_MSG_DATA_NONE,     /* message contains no data payload */
  73        CEPH_MSG_DATA_PAGES,    /* data source/destination is a page array */
  74        CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
  75#ifdef CONFIG_BLOCK
  76        CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
  77#endif /* CONFIG_BLOCK */
  78};
  79
  80static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
  81{
  82        switch (type) {
  83        case CEPH_MSG_DATA_NONE:
  84        case CEPH_MSG_DATA_PAGES:
  85        case CEPH_MSG_DATA_PAGELIST:
  86#ifdef CONFIG_BLOCK
  87        case CEPH_MSG_DATA_BIO:
  88#endif /* CONFIG_BLOCK */
  89                return true;
  90        default:
  91                return false;
  92        }
  93}
  94
  95struct ceph_msg_data {
  96        struct list_head                links;  /* ceph_msg->data */
  97        enum ceph_msg_data_type         type;
  98        union {
  99#ifdef CONFIG_BLOCK
 100                struct {
 101                        struct bio      *bio;
 102                        size_t          bio_length;
 103                };
 104#endif /* CONFIG_BLOCK */
 105                struct {
 106                        struct page     **pages;        /* NOT OWNER. */
 107                        size_t          length;         /* total # bytes */
 108                        unsigned int    alignment;      /* first page */
 109                };
 110                struct ceph_pagelist    *pagelist;
 111        };
 112};
 113
 114struct ceph_msg_data_cursor {
 115        size_t                  total_resid;    /* across all data items */
 116        struct list_head        *data_head;     /* = &ceph_msg->data */
 117
 118        struct ceph_msg_data    *data;          /* current data item */
 119        size_t                  resid;          /* bytes not yet consumed */
 120        bool                    last_piece;     /* current is last piece */
 121        bool                    need_crc;       /* crc update needed */
 122        union {
 123#ifdef CONFIG_BLOCK
 124                struct {                                /* bio */
 125                        struct bio      *bio;           /* bio from list */
 126                        struct bvec_iter bvec_iter;
 127                };
 128#endif /* CONFIG_BLOCK */
 129                struct {                                /* pages */
 130                        unsigned int    page_offset;    /* offset in page */
 131                        unsigned short  page_index;     /* index in array */
 132                        unsigned short  page_count;     /* pages in array */
 133                };
 134                struct {                                /* pagelist */
 135                        struct page     *page;          /* page from list */
 136                        size_t          offset;         /* bytes from list */
 137                };
 138        };
 139};
 140
 141/*
 142 * a single message.  it contains a header (src, dest, message type, etc.),
 143 * footer (crc values, mainly), a "front" message body, and possibly a
 144 * data payload (stored in some number of pages).
 145 */
 146struct ceph_msg {
 147        struct ceph_msg_header hdr;     /* header */
 148        union {
 149                struct ceph_msg_footer footer;          /* footer */
 150                struct ceph_msg_footer_old old_footer;  /* old format footer */
 151        };
 152        struct kvec front;              /* unaligned blobs of message */
 153        struct ceph_buffer *middle;
 154
 155        size_t                          data_length;
 156        struct list_head                data;
 157        struct ceph_msg_data_cursor     cursor;
 158
 159        struct ceph_connection *con;
 160        struct list_head list_head;     /* links for connection lists */
 161
 162        struct kref kref;
 163        bool more_to_follow;
 164        bool needs_out_seq;
 165        int front_alloc_len;
 166        unsigned long ack_stamp;        /* tx: when we were acked */
 167
 168        struct ceph_msgpool *pool;
 169};
 170
 171/* ceph connection fault delay defaults, for exponential backoff */
 172#define BASE_DELAY_INTERVAL     (HZ/2)
 173#define MAX_DELAY_INTERVAL      (5 * 60 * HZ)
 174
 175/*
 176 * A single connection with another host.
 177 *
 178 * We maintain a queue of outgoing messages, and some session state to
 179 * ensure that we can preserve the lossless, ordered delivery of
 180 * messages in the case of a TCP disconnect.
 181 */
 182struct ceph_connection {
 183        void *private;
 184
 185        const struct ceph_connection_operations *ops;
 186
 187        struct ceph_messenger *msgr;
 188
 189        atomic_t sock_state;
 190        struct socket *sock;
 191        struct ceph_entity_addr peer_addr; /* peer address */
 192        struct ceph_entity_addr peer_addr_for_me;
 193
 194        unsigned long flags;
 195        unsigned long state;
 196        const char *error_msg;  /* error message, if any */
 197
 198        struct ceph_entity_name peer_name; /* peer name */
 199
 200        u64 peer_features;
 201        u32 connect_seq;      /* identify the most recent connection
 202                                 attempt for this connection, client */
 203        u32 peer_global_seq;  /* peer's global seq for this connection */
 204
 205        int auth_retry;       /* true if we need a newer authorizer */
 206        void *auth_reply_buf;   /* where to put the authorizer reply */
 207        int auth_reply_buf_len;
 208
 209        struct mutex mutex;
 210
 211        /* out queue */
 212        struct list_head out_queue;
 213        struct list_head out_sent;   /* sending or sent but unacked */
 214        u64 out_seq;                 /* last message queued for send */
 215
 216        u64 in_seq, in_seq_acked;  /* last message received, acked */
 217
 218        /* connection negotiation temps */
 219        char in_banner[CEPH_BANNER_MAX_LEN];
 220        struct ceph_msg_connect out_connect;
 221        struct ceph_msg_connect_reply in_reply;
 222        struct ceph_entity_addr actual_peer_addr;
 223
 224        /* message out temps */
 225        struct ceph_msg_header out_hdr;
 226        struct ceph_msg *out_msg;        /* sending message (== tail of
 227                                            out_sent) */
 228        bool out_msg_done;
 229
 230        struct kvec out_kvec[8],         /* sending header/footer data */
 231                *out_kvec_cur;
 232        int out_kvec_left;   /* kvec's left in out_kvec */
 233        int out_skip;        /* skip this many bytes */
 234        int out_kvec_bytes;  /* total bytes left */
 235        int out_more;        /* there is more data after the kvecs */
 236        __le64 out_temp_ack; /* for writing an ack */
 237        struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
 238                                                     stamp */
 239
 240        /* message in temps */
 241        struct ceph_msg_header in_hdr;
 242        struct ceph_msg *in_msg;
 243        u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */
 244
 245        char in_tag;         /* protocol control byte */
 246        int in_base_pos;     /* bytes read */
 247        __le64 in_temp_ack;  /* for reading an ack */
 248
 249        struct timespec last_keepalive_ack; /* keepalive2 ack stamp */
 250
 251        struct delayed_work work;           /* send|recv work */
 252        unsigned long       delay;          /* current delay interval */
 253};
 254
 255
 256extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
 257extern int ceph_parse_ips(const char *c, const char *end,
 258                          struct ceph_entity_addr *addr,
 259                          int max_count, int *count);
 260
 261
 262extern int ceph_msgr_init(void);
 263extern void ceph_msgr_exit(void);
 264extern void ceph_msgr_flush(void);
 265
 266extern void ceph_messenger_init(struct ceph_messenger *msgr,
 267                                struct ceph_entity_addr *myaddr);
 268extern void ceph_messenger_fini(struct ceph_messenger *msgr);
 269
 270extern void ceph_con_init(struct ceph_connection *con, void *private,
 271                        const struct ceph_connection_operations *ops,
 272                        struct ceph_messenger *msgr);
 273extern void ceph_con_open(struct ceph_connection *con,
 274                          __u8 entity_type, __u64 entity_num,
 275                          struct ceph_entity_addr *addr);
 276extern bool ceph_con_opened(struct ceph_connection *con);
 277extern void ceph_con_close(struct ceph_connection *con);
 278extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 279
 280extern void ceph_msg_revoke(struct ceph_msg *msg);
 281extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 282
 283extern void ceph_con_keepalive(struct ceph_connection *con);
 284extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
 285                                       unsigned long interval);
 286
 287extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 288                                size_t length, size_t alignment);
 289extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 290                                struct ceph_pagelist *pagelist);
 291#ifdef CONFIG_BLOCK
 292extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 293                                size_t length);
 294#endif /* CONFIG_BLOCK */
 295
 296extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 297                                     bool can_fail);
 298
 299extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
 300extern void ceph_msg_put(struct ceph_msg *msg);
 301
 302extern void ceph_msg_dump(struct ceph_msg *msg);
 303
 304#endif
 305