/*
 * call-seq:
 *   ZMQ.select(in, out=[], err=[], timeout=nil) -> [in, out, err] | nil
 *
 * Like IO.select, but also works with 0MQ sockets.
 */
static VALUE module_select (int argc_, VALUE* argv_, VALUE self_)
{
    VALUE readset, writeset, errset, timeout;
    rb_scan_args (argc_, argv_, "13", &readset, &writeset, &errset, &timeout);

    long timeout_usec;
    int rc, nitems, i;
    zmq_pollitem_t *items, *item;

    if (!NIL_P (readset)) Check_Type (readset, T_ARRAY);
    if (!NIL_P (writeset)) Check_Type (writeset, T_ARRAY);
    if (!NIL_P (errset)) Check_Type (errset, T_ARRAY);
    
    if (NIL_P (timeout))
        timeout_usec = -1;
    else
        timeout_usec = (long)(NUM2DBL (timeout) * 1000000);
    
    /* Conservative estimate for nitems before we traverse the lists. */
    nitems = (NIL_P (readset) ? 0 : RARRAY_LEN (readset)) +
             (NIL_P (writeset) ? 0 : RARRAY_LEN (writeset)) +
             (NIL_P (errset) ? 0 : RARRAY_LEN (errset));
    items = (zmq_pollitem_t*)ruby_xmalloc(sizeof(zmq_pollitem_t) * nitems);

    struct poll_state ps;
    ps.nitems = 0;
    ps.items = items;
    ps.io_objects = rb_ary_new ();

    if (!NIL_P (readset)) {
        ps.event = ZMQ_POLLIN;
        rb_iterate(rb_each, readset, (iterfunc)poll_add_item, (VALUE)&ps);
    }

    if (!NIL_P (writeset)) {
        ps.event = ZMQ_POLLOUT;
        rb_iterate(rb_each, writeset, (iterfunc)poll_add_item, (VALUE)&ps);
    }

    if (!NIL_P (errset)) {
        ps.event = ZMQ_POLLERR;
        rb_iterate(rb_each, errset, (iterfunc)poll_add_item, (VALUE)&ps);
    }
    
    /* Reset nitems to the actual number of zmq_pollitem_t records we're sending. */
    nitems = ps.nitems;

#ifdef HAVE_RUBY_INTERN_H
    if (timeout_usec != 0) {
        struct zmq_poll_args poll_args;
        poll_args.items = items;
        poll_args.nitems = nitems;
        poll_args.timeout_usec = timeout_usec;

        rb_thread_blocking_region (zmq_poll_blocking, (void*)&poll_args, NULL, NULL);
        rc = poll_args.rc;
    }
    else
#endif
        rc = zmq_poll (items, nitems, timeout_usec);
    
    if (rc == -1) {
        rb_raise(rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }
    else if (rc == 0)
        return Qnil;
    
    VALUE read_active = rb_ary_new ();
    VALUE write_active = rb_ary_new ();
    VALUE err_active = rb_ary_new ();
    
    for (i = 0, item = &items[0]; i < nitems; i++, item++) {
        if (item->revents != 0) {
            VALUE io = RARRAY_PTR (ps.io_objects)[i];
            
            if (item->revents & ZMQ_POLLIN)
                rb_ary_push (read_active, io);
            if (item->revents & ZMQ_POLLOUT)
                rb_ary_push (write_active, io);
            if (item->revents & ZMQ_POLLERR)
                rb_ary_push (err_active, io);
        }
    }
    
    ruby_xfree (items);
    
    return rb_ary_new3 (3, read_active, write_active, err_active);
}