/*
* call-seq:
* ZMQ.select(in, out=[], err=[], timeout=nil) -> [in, out, err] | nil
*
* Like IO.select, but also works with 0MQ sockets.
*/
static VALUE module_select (int argc_, VALUE* argv_, VALUE self_)
{
VALUE readset, writeset, errset, timeout;
rb_scan_args (argc_, argv_, "13", &readset, &writeset, &errset, &timeout);
long timeout_usec;
int rc, nitems, i;
zmq_pollitem_t *items, *item;
if (!NIL_P (readset)) Check_Type (readset, T_ARRAY);
if (!NIL_P (writeset)) Check_Type (writeset, T_ARRAY);
if (!NIL_P (errset)) Check_Type (errset, T_ARRAY);
if (NIL_P (timeout))
timeout_usec = -1;
else
timeout_usec = (long)(NUM2DBL (timeout) * 1000000);
/* Conservative estimate for nitems before we traverse the lists. */
nitems = (NIL_P (readset) ? 0 : RARRAY_LEN (readset)) +
(NIL_P (writeset) ? 0 : RARRAY_LEN (writeset)) +
(NIL_P (errset) ? 0 : RARRAY_LEN (errset));
items = (zmq_pollitem_t*)ruby_xmalloc(sizeof(zmq_pollitem_t) * nitems);
struct poll_state ps;
ps.nitems = 0;
ps.items = items;
ps.io_objects = rb_ary_new ();
if (!NIL_P (readset)) {
ps.event = ZMQ_POLLIN;
rb_iterate(rb_each, readset, (iterfunc)poll_add_item, (VALUE)&ps);
}
if (!NIL_P (writeset)) {
ps.event = ZMQ_POLLOUT;
rb_iterate(rb_each, writeset, (iterfunc)poll_add_item, (VALUE)&ps);
}
if (!NIL_P (errset)) {
ps.event = ZMQ_POLLERR;
rb_iterate(rb_each, errset, (iterfunc)poll_add_item, (VALUE)&ps);
}
/* Reset nitems to the actual number of zmq_pollitem_t records we're sending. */
nitems = ps.nitems;
#ifdef HAVE_RUBY_INTERN_H
if (timeout_usec != 0) {
struct zmq_poll_args poll_args;
poll_args.items = items;
poll_args.nitems = nitems;
poll_args.timeout_usec = timeout_usec;
rb_thread_blocking_region (zmq_poll_blocking, (void*)&poll_args, NULL, NULL);
rc = poll_args.rc;
}
else
#endif
rc = zmq_poll (items, nitems, timeout_usec);
if (rc == -1) {
rb_raise(rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ()));
return Qnil;
}
else if (rc == 0)
return Qnil;
VALUE read_active = rb_ary_new ();
VALUE write_active = rb_ary_new ();
VALUE err_active = rb_ary_new ();
for (i = 0, item = &items[0]; i < nitems; i++, item++) {
if (item->revents != 0) {
VALUE io = RARRAY_PTR (ps.io_objects)[i];
if (item->revents & ZMQ_POLLIN)
rb_ary_push (read_active, io);
if (item->revents & ZMQ_POLLOUT)
rb_ary_push (write_active, io);
if (item->revents & ZMQ_POLLERR)
rb_ary_push (err_active, io);
}
}
ruby_xfree (items);
return rb_ary_new3 (3, read_active, write_active, err_active);
}