linux/include/linux/ceph/messenger.h
<<
>>
Prefs
   1#ifndef __FS_CEPH_MESSENGER_H
   2#define __FS_CEPH_MESSENGER_H
   3
   4#include <linux/blk_types.h>
   5#include <linux/kref.h>
   6#include <linux/mutex.h>
   7#include <linux/net.h>
   8#include <linux/radix-tree.h>
   9#include <linux/uio.h>
  10#include <linux/workqueue.h>
  11#include <net/net_namespace.h>
  12
  13#include <linux/ceph/types.h>
  14#include <linux/ceph/buffer.h>
  15
  16struct ceph_msg;
  17struct ceph_connection;
  18
  19/*
  20 * Ceph defines these callbacks for handling connection events.
  21 */
  22struct ceph_connection_operations {
  23        struct ceph_connection *(*get)(struct ceph_connection *);
  24        void (*put)(struct ceph_connection *);
  25
  26        /* handle an incoming message. */
  27        void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
  28
  29        /* authorize an outgoing connection */
  30        struct ceph_auth_handshake *(*get_authorizer) (
  31                                struct ceph_connection *con,
  32                               int *proto, int force_new);
  33        int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
  34        int (*invalidate_authorizer)(struct ceph_connection *con);
  35
  36        /* there was some error on the socket (disconnect, whatever) */
  37        void (*fault) (struct ceph_connection *con);
  38
  39        /* a remote host as terminated a message exchange session, and messages
  40         * we sent (or they tried to send us) may be lost. */
  41        void (*peer_reset) (struct ceph_connection *con);
  42
  43        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
  44                                        struct ceph_msg_header *hdr,
  45                                        int *skip);
  46
  47        int (*sign_message) (struct ceph_msg *msg);
  48        int (*check_message_signature) (struct ceph_msg *msg);
  49};
  50
  51/* use format string %s%d */
  52#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
  53
  54struct ceph_messenger {
  55        struct ceph_entity_inst inst;    /* my name+address */
  56        struct ceph_entity_addr my_enc_addr;
  57
  58        atomic_t stopping;
  59        possible_net_t net;
  60
  61        /*
  62         * the global_seq counts connections i (attempt to) initiate
  63         * in order to disambiguate certain connect race conditions.
  64         */
  65        u32 global_seq;
  66        spinlock_t global_seq_lock;
  67};
  68
  69enum ceph_msg_data_type {
  70        CEPH_MSG_DATA_NONE,     /* message contains no data payload */
  71        CEPH_MSG_DATA_PAGES,    /* data source/destination is a page array */
  72        CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
  73#ifdef CONFIG_BLOCK
  74        CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
  75#endif /* CONFIG_BLOCK */
  76};
  77
  78static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
  79{
  80        switch (type) {
  81        case CEPH_MSG_DATA_NONE:
  82        case CEPH_MSG_DATA_PAGES:
  83        case CEPH_MSG_DATA_PAGELIST:
  84#ifdef CONFIG_BLOCK
  85        case CEPH_MSG_DATA_BIO:
  86#endif /* CONFIG_BLOCK */
  87                return true;
  88        default:
  89                return false;
  90        }
  91}
  92
  93struct ceph_msg_data {
  94        struct list_head                links;  /* ceph_msg->data */
  95        enum ceph_msg_data_type         type;
  96        union {
  97#ifdef CONFIG_BLOCK
  98                struct {
  99                        struct bio      *bio;
 100                        size_t          bio_length;
 101                };
 102#endif /* CONFIG_BLOCK */
 103                struct {
 104                        struct page     **pages;        /* NOT OWNER. */
 105                        size_t          length;         /* total # bytes */
 106                        unsigned int    alignment;      /* first page */
 107                };
 108                struct ceph_pagelist    *pagelist;
 109        };
 110};
 111
 112struct ceph_msg_data_cursor {
 113        size_t                  total_resid;    /* across all data items */
 114        struct list_head        *data_head;     /* = &ceph_msg->data */
 115
 116        struct ceph_msg_data    *data;          /* current data item */
 117        size_t                  resid;          /* bytes not yet consumed */
 118        bool                    last_piece;     /* current is last piece */
 119        bool                    need_crc;       /* crc update needed */
 120        union {
 121#ifdef CONFIG_BLOCK
 122                struct {                                /* bio */
 123                        struct bio      *bio;           /* bio from list */
 124                        struct bvec_iter bvec_iter;
 125                };
 126#endif /* CONFIG_BLOCK */
 127                struct {                                /* pages */
 128                        unsigned int    page_offset;    /* offset in page */
 129                        unsigned short  page_index;     /* index in array */
 130                        unsigned short  page_count;     /* pages in array */
 131                };
 132                struct {                                /* pagelist */
 133                        struct page     *page;          /* page from list */
 134                        size_t          offset;         /* bytes from list */
 135                };
 136        };
 137};
 138
 139/*
 140 * a single message.  it contains a header (src, dest, message type, etc.),
 141 * footer (crc values, mainly), a "front" message body, and possibly a
 142 * data payload (stored in some number of pages).
 143 */
 144struct ceph_msg {
 145        struct ceph_msg_header hdr;     /* header */
 146        union {
 147                struct ceph_msg_footer footer;          /* footer */
 148                struct ceph_msg_footer_old old_footer;  /* old format footer */
 149        };
 150        struct kvec front;              /* unaligned blobs of message */
 151        struct ceph_buffer *middle;
 152
 153        size_t                          data_length;
 154        struct list_head                data;
 155        struct ceph_msg_data_cursor     cursor;
 156
 157        struct ceph_connection *con;
 158        struct list_head list_head;     /* links for connection lists */
 159
 160        struct kref kref;
 161        bool more_to_follow;
 162        bool needs_out_seq;
 163        int front_alloc_len;
 164        unsigned long ack_stamp;        /* tx: when we were acked */
 165
 166        struct ceph_msgpool *pool;
 167};
 168
 169/* ceph connection fault delay defaults, for exponential backoff */
 170#define BASE_DELAY_INTERVAL     (HZ/2)
 171#define MAX_DELAY_INTERVAL      (5 * 60 * HZ)
 172
 173/*
 174 * A single connection with another host.
 175 *
 176 * We maintain a queue of outgoing messages, and some session state to
 177 * ensure that we can preserve the lossless, ordered delivery of
 178 * messages in the case of a TCP disconnect.
 179 */
 180struct ceph_connection {
 181        void *private;
 182
 183        const struct ceph_connection_operations *ops;
 184
 185        struct ceph_messenger *msgr;
 186
 187        atomic_t sock_state;
 188        struct socket *sock;
 189        struct ceph_entity_addr peer_addr; /* peer address */
 190        struct ceph_entity_addr peer_addr_for_me;
 191
 192        unsigned long flags;
 193        unsigned long state;
 194        const char *error_msg;  /* error message, if any */
 195
 196        struct ceph_entity_name peer_name; /* peer name */
 197
 198        u64 peer_features;
 199        u32 connect_seq;      /* identify the most recent connection
 200                                 attempt for this connection, client */
 201        u32 peer_global_seq;  /* peer's global seq for this connection */
 202
 203        int auth_retry;       /* true if we need a newer authorizer */
 204        void *auth_reply_buf;   /* where to put the authorizer reply */
 205        int auth_reply_buf_len;
 206
 207        struct mutex mutex;
 208
 209        /* out queue */
 210        struct list_head out_queue;
 211        struct list_head out_sent;   /* sending or sent but unacked */
 212        u64 out_seq;                 /* last message queued for send */
 213
 214        u64 in_seq, in_seq_acked;  /* last message received, acked */
 215
 216        /* connection negotiation temps */
 217        char in_banner[CEPH_BANNER_MAX_LEN];
 218        struct ceph_msg_connect out_connect;
 219        struct ceph_msg_connect_reply in_reply;
 220        struct ceph_entity_addr actual_peer_addr;
 221
 222        /* message out temps */
 223        struct ceph_msg_header out_hdr;
 224        struct ceph_msg *out_msg;        /* sending message (== tail of
 225                                            out_sent) */
 226        bool out_msg_done;
 227
 228        struct kvec out_kvec[8],         /* sending header/footer data */
 229                *out_kvec_cur;
 230        int out_kvec_left;   /* kvec's left in out_kvec */
 231        int out_skip;        /* skip this many bytes */
 232        int out_kvec_bytes;  /* total bytes left */
 233        int out_more;        /* there is more data after the kvecs */
 234        __le64 out_temp_ack; /* for writing an ack */
 235        struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
 236                                                     stamp */
 237
 238        /* message in temps */
 239        struct ceph_msg_header in_hdr;
 240        struct ceph_msg *in_msg;
 241        u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */
 242
 243        char in_tag;         /* protocol control byte */
 244        int in_base_pos;     /* bytes read */
 245        __le64 in_temp_ack;  /* for reading an ack */
 246
 247        struct timespec last_keepalive_ack; /* keepalive2 ack stamp */
 248
 249        struct delayed_work work;           /* send|recv work */
 250        unsigned long       delay;          /* current delay interval */
 251};
 252
 253
 254extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
 255extern int ceph_parse_ips(const char *c, const char *end,
 256                          struct ceph_entity_addr *addr,
 257                          int max_count, int *count);
 258
 259
 260extern int ceph_msgr_init(void);
 261extern void ceph_msgr_exit(void);
 262extern void ceph_msgr_flush(void);
 263
 264extern void ceph_messenger_init(struct ceph_messenger *msgr,
 265                                struct ceph_entity_addr *myaddr);
 266extern void ceph_messenger_fini(struct ceph_messenger *msgr);
 267
 268extern void ceph_con_init(struct ceph_connection *con, void *private,
 269                        const struct ceph_connection_operations *ops,
 270                        struct ceph_messenger *msgr);
 271extern void ceph_con_open(struct ceph_connection *con,
 272                          __u8 entity_type, __u64 entity_num,
 273                          struct ceph_entity_addr *addr);
 274extern bool ceph_con_opened(struct ceph_connection *con);
 275extern void ceph_con_close(struct ceph_connection *con);
 276extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 277
 278extern void ceph_msg_revoke(struct ceph_msg *msg);
 279extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 280
 281extern void ceph_con_keepalive(struct ceph_connection *con);
 282extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
 283                                       unsigned long interval);
 284
 285extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 286                                size_t length, size_t alignment);
 287extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 288                                struct ceph_pagelist *pagelist);
 289#ifdef CONFIG_BLOCK
 290extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 291                                size_t length);
 292#endif /* CONFIG_BLOCK */
 293
 294extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 295                                     bool can_fail);
 296
 297extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
 298extern void ceph_msg_put(struct ceph_msg *msg);
 299
 300extern void ceph_msg_dump(struct ceph_msg *msg);
 301
 302#endif
 303