/* * call-seq: * ZMQ.select(in, out=[], err=[], timeout=nil) -> [in, out, err] | nil * * Like IO.select, but also works with 0MQ sockets. */ static VALUE module_select (int argc_, VALUE* argv_, VALUE self_) { VALUE readset, writeset, errset, timeout; rb_scan_args (argc_, argv_, "13", &readset, &writeset, &errset, &timeout); long timeout_usec; int rc, nitems, i; zmq_pollitem_t *items, *item; if (!NIL_P (readset)) Check_Type (readset, T_ARRAY); if (!NIL_P (writeset)) Check_Type (writeset, T_ARRAY); if (!NIL_P (errset)) Check_Type (errset, T_ARRAY); if (NIL_P (timeout)) timeout_usec = -1; else timeout_usec = (long)(NUM2DBL (timeout) * 1000000); /* Conservative estimate for nitems before we traverse the lists. */ nitems = (NIL_P (readset) ? 0 : RARRAY_LEN (readset)) + (NIL_P (writeset) ? 0 : RARRAY_LEN (writeset)) + (NIL_P (errset) ? 0 : RARRAY_LEN (errset)); items = (zmq_pollitem_t*)ruby_xmalloc(sizeof(zmq_pollitem_t) * nitems); struct poll_state ps; ps.nitems = 0; ps.items = items; ps.io_objects = rb_ary_new (); if (!NIL_P (readset)) { ps.event = ZMQ_POLLIN; rb_iterate(rb_each, readset, (iterfunc)poll_add_item, (VALUE)&ps); } if (!NIL_P (writeset)) { ps.event = ZMQ_POLLOUT; rb_iterate(rb_each, writeset, (iterfunc)poll_add_item, (VALUE)&ps); } if (!NIL_P (errset)) { ps.event = ZMQ_POLLERR; rb_iterate(rb_each, errset, (iterfunc)poll_add_item, (VALUE)&ps); } /* Reset nitems to the actual number of zmq_pollitem_t records we're sending. */ nitems = ps.nitems; #ifdef HAVE_RUBY_INTERN_H if (timeout_usec != 0) { struct zmq_poll_args poll_args; poll_args.items = items; poll_args.nitems = nitems; poll_args.timeout_usec = timeout_usec; rb_thread_blocking_region (zmq_poll_blocking, (void*)&poll_args, NULL, NULL); rc = poll_args.rc; } else #endif rc = zmq_poll (items, nitems, timeout_usec); if (rc == -1) { rb_raise(rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ())); return Qnil; } else if (rc == 0) return Qnil; VALUE read_active = rb_ary_new (); VALUE write_active = rb_ary_new (); VALUE err_active = rb_ary_new (); for (i = 0, item = &items[0]; i < nitems; i++, item++) { if (item->revents != 0) { VALUE io = RARRAY_PTR (ps.io_objects)[i]; if (item->revents & ZMQ_POLLIN) rb_ary_push (read_active, io); if (item->revents & ZMQ_POLLOUT) rb_ary_push (write_active, io); if (item->revents & ZMQ_POLLERR) rb_ary_push (err_active, io); } } ruby_xfree (items); return rb_ary_new3 (3, read_active, write_active, err_active); }