/*
 * call-seq:
 *   socket.recv(flags=0) -> message | nil
 *
 * Receives a message from the _socket_.  If there are no messages available
 * on the _socket_, the recv() function shall block until the request can be
 * satisfied.  The _flags_ argument is a combination of the flags defined
 * below:
 *
 * [ZMQ::NOBLOCK] Specifies that the operation should be performed in non-blocking mode.  If there are no messages available on the _socket_, the recv() function shall fail and return _nil_.
 *
 * == Multi-part messages
 * A 0MQ message is composed of 1 or more message parts. 0MQ ensures atomic
 * delivery of messages; peers shall receive either all <em>message parts</em> of a
 * message or none at all.
 *
 * The total number of message parts is unlimited.
 *
 * An application wishing to determine if a message is composed of multiple
 * parts does so by retrieving the value of the ZMQ::RCVMORE socket option on the
 * socket it is receiving the message from, using getsockopt(). If there are no
 * message parts to follow, or if the message is not composed of multiple parts,
 * ZMQ::RCVMORE shall report a value of false. Otherwise, ZMQ::RCVMORE shall
 * report a value of true, indicating that more message parts are to follow.
 */
static VALUE socket_recv (int argc_, VALUE* argv_, VALUE self_)
{
    VALUE flags_;
    
    rb_scan_args (argc_, argv_, "01", &flags_);

    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    int flags = NIL_P (flags_) ? 0 : NUM2INT (flags_);

    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    assert (rc == 0);

#ifdef HAVE_RUBY_INTERN_H
    if (!(flags & ZMQ_NOBLOCK)) {
        struct zmq_send_recv_args recv_args;
        recv_args.socket = s;
        recv_args.msg = &msg;
        recv_args.flags = flags;
        rb_thread_blocking_region (zmq_recv_blocking, (void*) &recv_args, NULL, NULL);
        rc = recv_args.rc;
    }
    else
#endif
        rc = zmq_recv (s, &msg, flags);
    if (rc != 0 && zmq_errno () == EAGAIN) {
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    if (rc != 0) {
        rb_raise (rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ()));
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    VALUE message = rb_str_new ((char*) zmq_msg_data (&msg),
        zmq_msg_size (&msg));
    rc = zmq_msg_close (&msg);
    assert (rc == 0);
    return message;
}