1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.gnu.org/licenses/gpl-2.0.html 19 * 20 * GPL HEADER END 21 */ 22/* 23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 24 * Use is subject to license terms. 25 * 26 * Copyright (c) 2012, Intel Corporation. 27 */ 28/* 29 * This file is part of Lustre, http://www.lustre.org/ 30 * Lustre is a trademark of Sun Microsystems, Inc. 31 * 32 * lnet/lnet/lib-eq.c 33 * 34 * Library level Event queue management routines 35 */ 36 37#define DEBUG_SUBSYSTEM S_LNET 38#include "../../include/linux/lnet/lib-lnet.h" 39 40/** 41 * Create an event queue that has room for \a count number of events. 42 * 43 * The event queue is circular and older events will be overwritten by new 44 * ones if they are not removed in time by the user using the functions 45 * LNetEQGet(), LNetEQWait(), or LNetEQPoll(). It is up to the user to 46 * determine the appropriate size of the event queue to prevent this loss 47 * of events. Note that when EQ handler is specified in \a callback, no 48 * event loss can happen, since the handler is run for each event deposited 49 * into the EQ. 50 * 51 * \param count The number of events to be stored in the event queue. It 52 * will be rounded up to the next power of two. 53 * \param callback A handler function that runs when an event is deposited 54 * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to 55 * indicate that no event handler is desired. 56 * \param handle On successful return, this location will hold a handle for 57 * the newly created EQ. 58 * 59 * \retval 0 On success. 60 * \retval -EINVAL If an parameter is not valid. 61 * \retval -ENOMEM If memory for the EQ can't be allocated. 62 * 63 * \see lnet_eq_handler_t for the discussion on EQ handler semantics. 64 */ 65int 66LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, 67 lnet_handle_eq_t *handle) 68{ 69 lnet_eq_t *eq; 70 71 LASSERT(the_lnet.ln_refcount > 0); 72 73 /* 74 * We need count to be a power of 2 so that when eq_{enq,deq}_seq 75 * overflow, they don't skip entries, so the queue has the same 76 * apparent capacity at all times 77 */ 78 if (count) 79 count = roundup_pow_of_two(count); 80 81 if (callback != LNET_EQ_HANDLER_NONE && count) 82 CWARN("EQ callback is guaranteed to get every event, do you still want to set eqcount %d for polling event which will have locking overhead? Please contact with developer to confirm\n", count); 83 84 /* 85 * count can be 0 if only need callback, we can eliminate 86 * overhead of enqueue event 87 */ 88 if (!count && callback == LNET_EQ_HANDLER_NONE) 89 return -EINVAL; 90 91 eq = lnet_eq_alloc(); 92 if (!eq) 93 return -ENOMEM; 94 95 if (count) { 96 LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t)); 97 if (!eq->eq_events) 98 goto failed; 99 /* 100 * NB allocator has set all event sequence numbers to 0, 101 * so all them should be earlier than eq_deq_seq 102 */ 103 } 104 105 eq->eq_deq_seq = 1; 106 eq->eq_enq_seq = 1; 107 eq->eq_size = count; 108 eq->eq_callback = callback; 109 110 eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(), 111 sizeof(*eq->eq_refs[0])); 112 if (!eq->eq_refs) 113 goto failed; 114 115 /* MUST hold both exclusive lnet_res_lock */ 116 lnet_res_lock(LNET_LOCK_EX); 117 /* 118 * NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do 119 * both EQ lookup and poll event with only lnet_eq_wait_lock 120 */ 121 lnet_eq_wait_lock(); 122 123 lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh); 124 list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active); 125 126 lnet_eq_wait_unlock(); 127 lnet_res_unlock(LNET_LOCK_EX); 128 129 lnet_eq2handle(handle, eq); 130 return 0; 131 132failed: 133 if (eq->eq_events) 134 LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t)); 135 136 if (eq->eq_refs) 137 cfs_percpt_free(eq->eq_refs); 138 139 lnet_eq_free(eq); 140 return -ENOMEM; 141} 142EXPORT_SYMBOL(LNetEQAlloc); 143 144/** 145 * Release the resources associated with an event queue if it's idle; 146 * otherwise do nothing and it's up to the user to try again. 147 * 148 * \param eqh A handle for the event queue to be released. 149 * 150 * \retval 0 If the EQ is not in use and freed. 151 * \retval -ENOENT If \a eqh does not point to a valid EQ. 152 * \retval -EBUSY If the EQ is still in use by some MDs. 153 */ 154int 155LNetEQFree(lnet_handle_eq_t eqh) 156{ 157 struct lnet_eq *eq; 158 lnet_event_t *events = NULL; 159 int **refs = NULL; 160 int *ref; 161 int rc = 0; 162 int size = 0; 163 int i; 164 165 LASSERT(the_lnet.ln_refcount > 0); 166 167 lnet_res_lock(LNET_LOCK_EX); 168 /* 169 * NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do 170 * both EQ lookup and poll event with only lnet_eq_wait_lock 171 */ 172 lnet_eq_wait_lock(); 173 174 eq = lnet_handle2eq(&eqh); 175 if (!eq) { 176 rc = -ENOENT; 177 goto out; 178 } 179 180 cfs_percpt_for_each(ref, i, eq->eq_refs) { 181 LASSERT(*ref >= 0); 182 if (!*ref) 183 continue; 184 185 CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n", 186 i, *ref); 187 rc = -EBUSY; 188 goto out; 189 } 190 191 /* stash for free after lock dropped */ 192 events = eq->eq_events; 193 size = eq->eq_size; 194 refs = eq->eq_refs; 195 196 lnet_res_lh_invalidate(&eq->eq_lh); 197 list_del(&eq->eq_list); 198 lnet_eq_free(eq); 199 out: 200 lnet_eq_wait_unlock(); 201 lnet_res_unlock(LNET_LOCK_EX); 202 203 if (events) 204 LIBCFS_FREE(events, size * sizeof(lnet_event_t)); 205 if (refs) 206 cfs_percpt_free(refs); 207 208 return rc; 209} 210EXPORT_SYMBOL(LNetEQFree); 211 212void 213lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) 214{ 215 /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */ 216 int index; 217 218 if (!eq->eq_size) { 219 LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE); 220 eq->eq_callback(ev); 221 return; 222 } 223 224 lnet_eq_wait_lock(); 225 ev->sequence = eq->eq_enq_seq++; 226 227 LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size)); 228 index = ev->sequence & (eq->eq_size - 1); 229 230 eq->eq_events[index] = *ev; 231 232 if (eq->eq_callback != LNET_EQ_HANDLER_NONE) 233 eq->eq_callback(ev); 234 235 /* Wake anyone waiting in LNetEQPoll() */ 236 if (waitqueue_active(&the_lnet.ln_eq_waitq)) 237 wake_up_all(&the_lnet.ln_eq_waitq); 238 lnet_eq_wait_unlock(); 239} 240 241static int 242lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev) 243{ 244 int new_index = eq->eq_deq_seq & (eq->eq_size - 1); 245 lnet_event_t *new_event = &eq->eq_events[new_index]; 246 int rc; 247 248 /* must called with lnet_eq_wait_lock hold */ 249 if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence)) 250 return 0; 251 252 /* We've got a new event... */ 253 *ev = *new_event; 254 255 CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n", 256 new_event, eq->eq_deq_seq, eq->eq_size); 257 258 /* ...but did it overwrite an event we've not seen yet? */ 259 if (eq->eq_deq_seq == new_event->sequence) { 260 rc = 1; 261 } else { 262 /* 263 * don't complain with CERROR: some EQs are sized small 264 * anyway; if it's important, the caller should complain 265 */ 266 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n", 267 eq->eq_deq_seq, new_event->sequence); 268 rc = -EOVERFLOW; 269 } 270 271 eq->eq_deq_seq = new_event->sequence + 1; 272 return rc; 273} 274 275/** 276 * A nonblocking function that can be used to get the next event in an EQ. 277 * If an event handler is associated with the EQ, the handler will run before 278 * this function returns successfully. The event is removed from the queue. 279 * 280 * \param eventq A handle for the event queue. 281 * \param event On successful return (1 or -EOVERFLOW), this location will 282 * hold the next event in the EQ. 283 * 284 * \retval 0 No pending event in the EQ. 285 * \retval 1 Indicates success. 286 * \retval -ENOENT If \a eventq does not point to a valid EQ. 287 * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that 288 * at least one event between this event and the last event obtained from the 289 * EQ has been dropped due to limited space in the EQ. 290 */ 291 292/** 293 * Block the calling process until there is an event in the EQ. 294 * If an event handler is associated with the EQ, the handler will run before 295 * this function returns successfully. This function returns the next event 296 * in the EQ and removes it from the EQ. 297 * 298 * \param eventq A handle for the event queue. 299 * \param event On successful return (1 or -EOVERFLOW), this location will 300 * hold the next event in the EQ. 301 * 302 * \retval 1 Indicates success. 303 * \retval -ENOENT If \a eventq does not point to a valid EQ. 304 * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that 305 * at least one event between this event and the last event obtained from the 306 * EQ has been dropped due to limited space in the EQ. 307 */ 308 309static int 310lnet_eq_wait_locked(int *timeout_ms) 311__must_hold(&the_lnet.ln_eq_wait_lock) 312{ 313 int tms = *timeout_ms; 314 int wait; 315 wait_queue_t wl; 316 unsigned long now; 317 318 if (!tms) 319 return -ENXIO; /* don't want to wait and no new event */ 320 321 init_waitqueue_entry(&wl, current); 322 set_current_state(TASK_INTERRUPTIBLE); 323 add_wait_queue(&the_lnet.ln_eq_waitq, &wl); 324 325 lnet_eq_wait_unlock(); 326 327 if (tms < 0) { 328 schedule(); 329 } else { 330 now = jiffies; 331 schedule_timeout(msecs_to_jiffies(tms)); 332 tms -= jiffies_to_msecs(jiffies - now); 333 if (tms < 0) /* no more wait but may have new event */ 334 tms = 0; 335 } 336 337 wait = tms; /* might need to call here again */ 338 *timeout_ms = tms; 339 340 lnet_eq_wait_lock(); 341 remove_wait_queue(&the_lnet.ln_eq_waitq, &wl); 342 343 return wait; 344} 345 346/** 347 * Block the calling process until there's an event from a set of EQs or 348 * timeout happens. 349 * 350 * If an event handler is associated with the EQ, the handler will run before 351 * this function returns successfully, in which case the corresponding event 352 * is consumed. 353 * 354 * LNetEQPoll() provides a timeout to allow applications to poll, block for a 355 * fixed period, or block indefinitely. 356 * 357 * \param eventqs,neq An array of EQ handles, and size of the array. 358 * \param timeout_ms Time in milliseconds to wait for an event to occur on 359 * one of the EQs. The constant LNET_TIME_FOREVER can be used to indicate an 360 * infinite timeout. 361 * \param event,which On successful return (1 or -EOVERFLOW), \a event will 362 * hold the next event in the EQs, and \a which will contain the index of the 363 * EQ from which the event was taken. 364 * 365 * \retval 0 No pending event in the EQs after timeout. 366 * \retval 1 Indicates success. 367 * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that 368 * at least one event between this event and the last event obtained from the 369 * EQ indicated by \a which has been dropped due to limited space in the EQ. 370 * \retval -ENOENT If there's an invalid handle in \a eventqs. 371 */ 372int 373LNetEQPoll(lnet_handle_eq_t *eventqs, int neq, int timeout_ms, 374 lnet_event_t *event, int *which) 375{ 376 int wait = 1; 377 int rc; 378 int i; 379 380 LASSERT(the_lnet.ln_refcount > 0); 381 382 if (neq < 1) 383 return -ENOENT; 384 385 lnet_eq_wait_lock(); 386 387 for (;;) { 388 for (i = 0; i < neq; i++) { 389 lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]); 390 391 if (!eq) { 392 lnet_eq_wait_unlock(); 393 return -ENOENT; 394 } 395 396 rc = lnet_eq_dequeue_event(eq, event); 397 if (rc) { 398 lnet_eq_wait_unlock(); 399 *which = i; 400 return rc; 401 } 402 } 403 404 if (!wait) 405 break; 406 407 /* 408 * return value of lnet_eq_wait_locked: 409 * -1 : did nothing and it's sure no new event 410 * 1 : sleep inside and wait until new event 411 * 0 : don't want to wait anymore, but might have new event 412 * so need to call dequeue again 413 */ 414 wait = lnet_eq_wait_locked(&timeout_ms); 415 if (wait < 0) /* no new event */ 416 break; 417 } 418 419 lnet_eq_wait_unlock(); 420 return 0; 421} 422