/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #define PN_USE_DEPRECATED_API 1 /* * Implements a subset of msgr-send.c using reactor events. */ #include "proton/message.h" #include "proton/error.h" #include "proton/types.h" #include "proton/reactor.h" #include "proton/handlers.h" #include "proton/engine.h" #include "proton/url.h" #include "msgr-common.h" #include #include #include #include typedef struct { Addresses_t targets; uint64_t msg_count; uint32_t msg_size; // of body uint32_t send_batch; int outgoing_window; unsigned int report_interval; // in seconds //Addresses_t subscriptions; //Addresses_t reply_tos; int get_replies; int unique_message; // 1 -> create and free a pn_message_t for each send/recv int timeout; // in seconds int incoming_window; int recv_count; const char *name; char *certificate; char *privatekey; // used to sign certificate char *password; // for private key file char *ca_db; // trusted CA database } Options_t; static void usage(int rc) { printf("Usage: reactor-send [OPTIONS] \n" " -a \tThe target address [amqp[s]://domain[/name]]\n" " -c # \tNumber of messages to send before exiting [0=forever]\n" " -b # \tSize of message body in bytes [1024]\n" " -R \tWait for a reply to each sent message\n" " -V \tEnable debug logging\n" ); exit(rc); } typedef struct { Options_t *opts; Statistics_t *stats; uint64_t sent; uint64_t received; pn_message_t *message; pn_message_t *reply_message; pn_atom_t id; char *encoded_data; size_t encoded_data_size; pn_url_t *send_url; pn_string_t *hostname; pn_string_t *container_id; pn_string_t *reply_to; } sender_context_t; void sender_context_init(sender_context_t *sc, Options_t *opts, Statistics_t *stats) { sc->opts = opts; sc->stats = stats; sc->sent = 0; sc->received = 0; sc->id.type = PN_ULONG; // 4096 extra bytes should easily cover the message metadata sc->encoded_data_size = sc->opts->msg_size + 4096; sc->encoded_data = (char *)calloc(1, sc->encoded_data_size); check(sc->encoded_data, "failed to allocate encoding buffer"); sc->container_id = pn_string("reactor-send"); // prefer uuid-like name sc->reply_message = (sc->opts->get_replies) ? pn_message() : 0; sc->message = pn_message(); check(sc->message, "failed to allocate a message"); sc->reply_to = pn_string("amqp://"); pn_string_addf(sc->reply_to, "%s", pn_string_get(sc->container_id)); pn_message_set_reply_to(sc->message, pn_string_get(sc->reply_to)); pn_data_t *body = pn_message_body(sc->message); // borrow the encoding buffer this one time char *data = sc->encoded_data; pn_data_put_binary(body, pn_bytes(sc->opts->msg_size, data)); check(sc->opts->targets.count > 0, "no specified address"); sc->send_url = pn_url_parse(sc->opts->targets.addresses[0]); const char *host = pn_url_get_host(sc->send_url); const char *port = pn_url_get_port(sc->send_url); sc->hostname = pn_string(host); if (port && strlen(port)) pn_string_addf(sc->hostname, ":%s", port); } sender_context_t *sender_context(pn_handler_t *h) { return (sender_context_t *) pn_handler_mem(h); } void sender_cleanup(pn_handler_t *h) { sender_context_t *sc = sender_context(h); pn_message_free(sc->message); pn_message_free(sc->reply_message); pn_url_free(sc->send_url); pn_free(sc->hostname); pn_free(sc->container_id); pn_free(sc->reply_to); free(sc->encoded_data); } pn_handler_t *replyto_handler(sender_context_t *sc); pn_message_t* get_message(sender_context_t *sc, bool sending) { if (sc->opts->unique_message) { pn_message_t *m = pn_message(); check(m, "failed to allocate a message"); if (sending) { pn_message_set_reply_to(m, pn_string_get(sc->reply_to)); // copy the data pn_data_t *body = pn_message_body(m); pn_data_t *template_body = pn_message_body(sc->message); pn_data_put_binary(body, pn_data_get_binary(template_body)); } return m; } else return sending ? sc->message : sc->reply_message; // our simplified "message pool" } void return_message(sender_context_t *sc, pn_message_t *m) { if (sc->opts->unique_message) pn_message_free(m); } void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) { sender_context_t *sc = sender_context(h); switch (type) { case PN_CONNECTION_INIT: { pn_connection_t *conn = pn_event_connection(event); pn_connection_set_container(conn, pn_string_get(sc->container_id)); pn_connection_set_hostname(conn, pn_string_get(sc->hostname)); pn_connection_open(conn); pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); pn_link_t *snd = pn_sender(ssn, "sender"); const char *path = pn_url_get_path(sc->send_url); if (path && strlen(path)) { pn_terminus_set_address(pn_link_target(snd), path); pn_terminus_set_address(pn_link_source(snd), path); } pn_link_open(snd); } break; case PN_LINK_FLOW: { pn_link_t *snd = pn_event_link(event); while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) { if (sc->sent == 0) statistics_start(sc->stats); char tag[8]; void *ptr = &tag; *((uint64_t *) ptr) = sc->sent; pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8)); // setup the message to send pn_message_t *msg = get_message(sc, true);; pn_message_set_address(msg, sc->opts->targets.addresses[0]); sc->id.u.as_ulong = sc->sent; pn_message_set_correlation_id(msg, sc->id); pn_message_set_creation_time(msg, msgr_now()); size_t size = sc->encoded_data_size; int err = pn_message_encode(msg, sc->encoded_data, &size); check(err == 0, "message encoding error"); pn_link_send(snd, sc->encoded_data, size); pn_delivery_settle(dlv); sc->sent++; return_message(sc, msg); } if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) { pn_link_close(snd); pn_connection_t *conn = pn_event_connection(event); pn_connection_close(conn); } } break; case PN_LINK_INIT: { pn_link_t *link = pn_event_link(event); if (pn_link_is_receiver(link)) { // Response messages link. Could manage credit and deliveries in this handler but // a dedicated handler also works. pn_handler_t *replyto = replyto_handler(sc); pn_flowcontroller_t *fc = pn_flowcontroller(1024); pn_handler_add(replyto, fc); pn_decref(fc); pn_handshaker_t *handshaker = pn_handshaker(); pn_handler_add(replyto, handshaker); pn_decref(handshaker); pn_record_t *record = pn_link_attachments(link); pn_record_set_handler(record, replyto); pn_decref(replyto); } } break; case PN_CONNECTION_LOCAL_CLOSE: { statistics_report(sc->stats, sc->sent, sc->received); } break; default: break; } } pn_handler_t *sender_handler(Options_t *opts, Statistics_t *stats) { pn_handler_t *h = pn_handler_new(sender_dispatch, sizeof(sender_context_t), sender_cleanup); sender_context_t *sc = sender_context(h); sender_context_init(sc, opts, stats); return h; } sender_context_t *replyto_sender_context(pn_handler_t *h) { sender_context_t **p = (sender_context_t **) pn_handler_mem(h); return *p; } void replyto_cleanup(pn_handler_t *h) {} void replyto_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) { sender_context_t *sc = replyto_sender_context(h); switch (type) { case PN_DELIVERY: { check(sc->opts->get_replies, "Unexpected reply message"); pn_link_t *recv_link = pn_event_link(event); pn_delivery_t *dlv = pn_event_delivery(event); if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) { size_t encoded_size = pn_delivery_pending(dlv); check(encoded_size <= sc->encoded_data_size, "decoding buffer too small"); ssize_t n = pn_link_recv(recv_link, sc->encoded_data, encoded_size); check(n == (ssize_t)encoded_size, "read fail on reply link"); pn_message_t *msg = get_message(sc, false); int err = pn_message_decode(msg, sc->encoded_data, n); check(err == 0, "message decode error"); statistics_msg_received(sc->stats, msg); sc->received++; pn_delivery_settle(dlv); return_message(sc, msg); } if (sc->received == sc->opts->msg_count) { pn_link_close(recv_link); pn_connection_t *conn = pn_event_connection(event); pn_connection_close(conn); } } break; default: break; } } pn_handler_t *replyto_handler(sender_context_t *sc) { pn_handler_t *h = pn_handler_new(replyto_dispatch, sizeof(sender_context_t *), replyto_cleanup); sender_context_t **p = (sender_context_t **) pn_handler_mem(h); *p = sc; return h; } static void parse_options( int argc, char **argv, Options_t *opts ) { int c; opterr = 0; memset( opts, 0, sizeof(*opts) ); opts->msg_size = 1024; opts->send_batch = 1024; opts->timeout = -1; opts->recv_count = -1; opts->unique_message = 0; addresses_init(&opts->targets); while ((c = getopt(argc, argv, "ua:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) { switch(c) { case 'a': { // TODO: multiple addresses? To keep tests happy, accept multiple for now, // but ignore all but the first. addresses_merge( &opts->targets, optarg ); } break; case 'c': if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'b': if (sscanf( optarg, "%u", &opts->msg_size ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'p': if (sscanf( optarg, "%u", &opts->send_batch ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'w': if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'e': if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'R': opts->get_replies = 1; break; case 'u': opts->unique_message = 1; break; case 't': if (sscanf( optarg, "%d", &opts->timeout ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } if (opts->timeout > 0) opts->timeout *= 1000; break; case 'W': if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'B': if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'V': enable_logging(); break; case 'N': opts->name = optarg; break; case 'T': opts->ca_db = optarg; break; case 'C': opts->certificate = optarg; break; case 'K': opts->privatekey = optarg; break; case 'P': parse_password( optarg, &opts->password ); break; default: usage(1); } } // default target if none specified if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" ); } int main(int argc, char** argv) { Options_t opts; Statistics_t stats; parse_options( argc, argv, &opts ); pn_reactor_t *reactor = pn_reactor(); pn_handler_t *sh = sender_handler(&opts, &stats); pn_handler_add(sh, pn_handshaker()); pn_reactor_connection(reactor, sh); pn_reactor_run(reactor); pn_reactor_free(reactor); pn_handler_free(sh); addresses_free(&opts.targets); free(opts.password); return 0; } #undef PN_USE_DEPRECATED_API