/*
 * call-seq:
 *   socket.send(message, flags=0) -> true | false
 *
 * Queue the message referenced by the _msg_ argument to be send to the
 * _socket_.  The _flags_ argument is a combination of the flags defined
 * below:
 *
 * [ZMQ::NOBLOCK] Specifies that the operation should be performed in non-blocking mode. If the message cannot be queued on the _socket_, the function shall fail and return _false_.
 * [ZMQ::SNDMORE] Specifies that the message being sent is a multi-part message, and that further message parts are to follow. Refer to the section regarding multi-part messages below for a detailed description.
 *
 * <b>NOTE:</b> A successful invocation of send() does not indicate that the
 * message has been transmitted to the network, only that it has been queued on
 * the socket and 0MQ has assumed responsibility for the message.
 *
 * == Multi-part messages
 * A 0MQ message is composed of 1 or more message parts. 0MQ ensures atomic
 * delivery of messages; peers shall receive either all <em>message parts</em> of a
 * message or none at all.
 *
 * The total number of message parts is unlimited.
 *
 * An application wishing to send a multi-part message does so by specifying the
 * ZMQ::SNDMORE flag to send(). The presence of this flag indicates to 0MQ
 * that the message being sent is a multi-part message and that more message
 * parts are to follow. When the application wishes to send the final message
 * part it does so by calling send() without the ZMQ::SNDMORE flag; this
 * indicates that no more message parts are to follow.
 *
 * This function returns _true_ if successful, _false_ if not.
 */
static VALUE socket_send (int argc_, VALUE* argv_, VALUE self_)
{
    VALUE msg_, flags_;
    
    rb_scan_args (argc_, argv_, "11", &msg_, &flags_);

    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    Check_Type (msg_, T_STRING);

    int flags = NIL_P (flags_) ? 0 : NUM2INT (flags_);

    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, RSTRING_LEN (msg_));
    if (rc != 0) {
        rb_raise (rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }
    memcpy (zmq_msg_data (&msg), RSTRING_PTR (msg_), RSTRING_LEN (msg_));

#ifdef HAVE_RUBY_INTERN_H
    if (!(flags & ZMQ_NOBLOCK)) {
        struct zmq_send_recv_args send_args;
        send_args.socket = s;
        send_args.msg = &msg;
        send_args.flags = flags;
        rb_thread_blocking_region (zmq_send_blocking, (void*) &send_args, NULL, NULL);
        rc = send_args.rc;
    }
    else
#endif
        rc = zmq_send (s, &msg, flags);
    if (rc != 0 && zmq_errno () == EAGAIN) {
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qfalse;
    }

    if (rc != 0) {
        rb_raise (rb_eRuntimeError, "%s", zmq_strerror (zmq_errno ()));
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    rc = zmq_msg_close (&msg);
    assert (rc == 0);
    return Qtrue;
}