linux/include/linux/ceph/messenger.h
<<
>>
Prefs
   1#ifndef __FS_CEPH_MESSENGER_H
   2#define __FS_CEPH_MESSENGER_H
   3
   4#include <linux/blk_types.h>
   5#include <linux/kref.h>
   6#include <linux/mutex.h>
   7#include <linux/net.h>
   8#include <linux/radix-tree.h>
   9#include <linux/uio.h>
  10#include <linux/workqueue.h>
  11#include <net/net_namespace.h>
  12
  13#include <linux/ceph/types.h>
  14#include <linux/ceph/buffer.h>
  15
  16struct ceph_msg;
  17struct ceph_connection;
  18
  19/*
  20 * Ceph defines these callbacks for handling connection events.
  21 */
  22struct ceph_connection_operations {
  23        struct ceph_connection *(*get)(struct ceph_connection *);
  24        void (*put)(struct ceph_connection *);
  25
  26        /* handle an incoming message. */
  27        void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
  28
  29        /* authorize an outgoing connection */
  30        struct ceph_auth_handshake *(*get_authorizer) (
  31                                struct ceph_connection *con,
  32                               int *proto, int force_new);
  33        int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
  34        int (*invalidate_authorizer)(struct ceph_connection *con);
  35
  36        /* there was some error on the socket (disconnect, whatever) */
  37        void (*fault) (struct ceph_connection *con);
  38
  39        /* a remote host as terminated a message exchange session, and messages
  40         * we sent (or they tried to send us) may be lost. */
  41        void (*peer_reset) (struct ceph_connection *con);
  42
  43        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
  44                                        struct ceph_msg_header *hdr,
  45                                        int *skip);
  46        int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
  47
  48        int (*check_message_signature) (struct ceph_connection *con,
  49                                        struct ceph_msg *msg);
  50};
  51
  52/* use format string %s%d */
  53#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
  54
  55struct ceph_messenger {
  56        struct ceph_entity_inst inst;    /* my name+address */
  57        struct ceph_entity_addr my_enc_addr;
  58
  59        atomic_t stopping;
  60        possible_net_t net;
  61        bool nocrc;
  62        bool tcp_nodelay;
  63
  64        /*
  65         * the global_seq counts connections i (attempt to) initiate
  66         * in order to disambiguate certain connect race conditions.
  67         */
  68        u32 global_seq;
  69        spinlock_t global_seq_lock;
  70
  71        u64 supported_features;
  72        u64 required_features;
  73};
  74
  75enum ceph_msg_data_type {
  76        CEPH_MSG_DATA_NONE,     /* message contains no data payload */
  77        CEPH_MSG_DATA_PAGES,    /* data source/destination is a page array */
  78        CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
  79#ifdef CONFIG_BLOCK
  80        CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
  81#endif /* CONFIG_BLOCK */
  82};
  83
  84static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
  85{
  86        switch (type) {
  87        case CEPH_MSG_DATA_NONE:
  88        case CEPH_MSG_DATA_PAGES:
  89        case CEPH_MSG_DATA_PAGELIST:
  90#ifdef CONFIG_BLOCK
  91        case CEPH_MSG_DATA_BIO:
  92#endif /* CONFIG_BLOCK */
  93                return true;
  94        default:
  95                return false;
  96        }
  97}
  98
  99struct ceph_msg_data {
 100        struct list_head                links;  /* ceph_msg->data */
 101        enum ceph_msg_data_type         type;
 102        union {
 103#ifdef CONFIG_BLOCK
 104                struct {
 105                        struct bio      *bio;
 106                        size_t          bio_length;
 107                };
 108#endif /* CONFIG_BLOCK */
 109                struct {
 110                        struct page     **pages;        /* NOT OWNER. */
 111                        size_t          length;         /* total # bytes */
 112                        unsigned int    alignment;      /* first page */
 113                };
 114                struct ceph_pagelist    *pagelist;
 115        };
 116};
 117
 118struct ceph_msg_data_cursor {
 119        size_t                  total_resid;    /* across all data items */
 120        struct list_head        *data_head;     /* = &ceph_msg->data */
 121
 122        struct ceph_msg_data    *data;          /* current data item */
 123        size_t                  resid;          /* bytes not yet consumed */
 124        bool                    last_piece;     /* current is last piece */
 125        bool                    need_crc;       /* crc update needed */
 126        union {
 127#ifdef CONFIG_BLOCK
 128                struct {                                /* bio */
 129                        struct bio      *bio;           /* bio from list */
 130                        struct bvec_iter bvec_iter;
 131                };
 132#endif /* CONFIG_BLOCK */
 133                struct {                                /* pages */
 134                        unsigned int    page_offset;    /* offset in page */
 135                        unsigned short  page_index;     /* index in array */
 136                        unsigned short  page_count;     /* pages in array */
 137                };
 138                struct {                                /* pagelist */
 139                        struct page     *page;          /* page from list */
 140                        size_t          offset;         /* bytes from list */
 141                };
 142        };
 143};
 144
 145/*
 146 * a single message.  it contains a header (src, dest, message type, etc.),
 147 * footer (crc values, mainly), a "front" message body, and possibly a
 148 * data payload (stored in some number of pages).
 149 */
 150struct ceph_msg {
 151        struct ceph_msg_header hdr;     /* header */
 152        union {
 153                struct ceph_msg_footer footer;          /* footer */
 154                struct ceph_msg_footer_old old_footer;  /* old format footer */
 155        };
 156        struct kvec front;              /* unaligned blobs of message */
 157        struct ceph_buffer *middle;
 158
 159        size_t                          data_length;
 160        struct list_head                data;
 161        struct ceph_msg_data_cursor     cursor;
 162
 163        struct ceph_connection *con;
 164        struct list_head list_head;     /* links for connection lists */
 165
 166        struct kref kref;
 167        bool more_to_follow;
 168        bool needs_out_seq;
 169        int front_alloc_len;
 170        unsigned long ack_stamp;        /* tx: when we were acked */
 171
 172        struct ceph_msgpool *pool;
 173};
 174
 175/* ceph connection fault delay defaults, for exponential backoff */
 176#define BASE_DELAY_INTERVAL     (HZ/2)
 177#define MAX_DELAY_INTERVAL      (5 * 60 * HZ)
 178
 179/*
 180 * A single connection with another host.
 181 *
 182 * We maintain a queue of outgoing messages, and some session state to
 183 * ensure that we can preserve the lossless, ordered delivery of
 184 * messages in the case of a TCP disconnect.
 185 */
 186struct ceph_connection {
 187        void *private;
 188
 189        const struct ceph_connection_operations *ops;
 190
 191        struct ceph_messenger *msgr;
 192
 193        atomic_t sock_state;
 194        struct socket *sock;
 195        struct ceph_entity_addr peer_addr; /* peer address */
 196        struct ceph_entity_addr peer_addr_for_me;
 197
 198        unsigned long flags;
 199        unsigned long state;
 200        const char *error_msg;  /* error message, if any */
 201
 202        struct ceph_entity_name peer_name; /* peer name */
 203
 204        u64 peer_features;
 205        u32 connect_seq;      /* identify the most recent connection
 206                                 attempt for this connection, client */
 207        u32 peer_global_seq;  /* peer's global seq for this connection */
 208
 209        int auth_retry;       /* true if we need a newer authorizer */
 210        void *auth_reply_buf;   /* where to put the authorizer reply */
 211        int auth_reply_buf_len;
 212
 213        struct mutex mutex;
 214
 215        /* out queue */
 216        struct list_head out_queue;
 217        struct list_head out_sent;   /* sending or sent but unacked */
 218        u64 out_seq;                 /* last message queued for send */
 219
 220        u64 in_seq, in_seq_acked;  /* last message received, acked */
 221
 222        /* connection negotiation temps */
 223        char in_banner[CEPH_BANNER_MAX_LEN];
 224        struct ceph_msg_connect out_connect;
 225        struct ceph_msg_connect_reply in_reply;
 226        struct ceph_entity_addr actual_peer_addr;
 227
 228        /* message out temps */
 229        struct ceph_msg *out_msg;        /* sending message (== tail of
 230                                            out_sent) */
 231        bool out_msg_done;
 232
 233        struct kvec out_kvec[8],         /* sending header/footer data */
 234                *out_kvec_cur;
 235        int out_kvec_left;   /* kvec's left in out_kvec */
 236        int out_skip;        /* skip this many bytes */
 237        int out_kvec_bytes;  /* total bytes left */
 238        bool out_kvec_is_msg; /* kvec refers to out_msg */
 239        int out_more;        /* there is more data after the kvecs */
 240        __le64 out_temp_ack; /* for writing an ack */
 241        struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
 242                                                     stamp */
 243
 244        /* message in temps */
 245        struct ceph_msg_header in_hdr;
 246        struct ceph_msg *in_msg;
 247        u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */
 248
 249        char in_tag;         /* protocol control byte */
 250        int in_base_pos;     /* bytes read */
 251        __le64 in_temp_ack;  /* for reading an ack */
 252
 253        struct timespec last_keepalive_ack; /* keepalive2 ack stamp */
 254
 255        struct delayed_work work;           /* send|recv work */
 256        unsigned long       delay;          /* current delay interval */
 257};
 258
 259
 260extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
 261extern int ceph_parse_ips(const char *c, const char *end,
 262                          struct ceph_entity_addr *addr,
 263                          int max_count, int *count);
 264
 265
 266extern int ceph_msgr_init(void);
 267extern void ceph_msgr_exit(void);
 268extern void ceph_msgr_flush(void);
 269
 270extern void ceph_messenger_init(struct ceph_messenger *msgr,
 271                        struct ceph_entity_addr *myaddr,
 272                        u64 supported_features,
 273                        u64 required_features,
 274                        bool nocrc,
 275                        bool tcp_nodelay);
 276extern void ceph_messenger_fini(struct ceph_messenger *msgr);
 277
 278extern void ceph_con_init(struct ceph_connection *con, void *private,
 279                        const struct ceph_connection_operations *ops,
 280                        struct ceph_messenger *msgr);
 281extern void ceph_con_open(struct ceph_connection *con,
 282                          __u8 entity_type, __u64 entity_num,
 283                          struct ceph_entity_addr *addr);
 284extern bool ceph_con_opened(struct ceph_connection *con);
 285extern void ceph_con_close(struct ceph_connection *con);
 286extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 287
 288extern void ceph_msg_revoke(struct ceph_msg *msg);
 289extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 290
 291extern void ceph_con_keepalive(struct ceph_connection *con);
 292extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
 293                                       unsigned long interval);
 294
 295extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 296                                size_t length, size_t alignment);
 297extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 298                                struct ceph_pagelist *pagelist);
 299#ifdef CONFIG_BLOCK
 300extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 301                                size_t length);
 302#endif /* CONFIG_BLOCK */
 303
 304extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 305                                     bool can_fail);
 306
 307extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
 308extern void ceph_msg_put(struct ceph_msg *msg);
 309
 310extern void ceph_msg_dump(struct ceph_msg *msg);
 311
 312#endif
 313