/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #define PN_USE_DEPRECATED_API 1 #include "msgr-common.h" #include "proton/message.h" #include "proton/messenger.h" #include "proton/error.h" #include #include #include #include typedef struct { Addresses_t subscriptions; uint64_t msg_count; int recv_count; int incoming_window; int timeout; // seconds unsigned int report_interval; // in seconds int outgoing_window; Addresses_t forwarding_targets; int reply; const char *name; const char *ready_text; char *certificate; char *privatekey; // used to sign certificate char *password; // for private key file char *ca_db; // trusted CA database } Options_t; static void usage(int rc) { printf("Usage: msgr-recv [OPTIONS] \n" " -a [,]* \tAddresses to listen on [amqp://~0.0.0.0]\n" " -c # \tNumber of messages to receive before exiting [0=forever]\n" " -b # \tArgument to Messenger::recv(n) [2048]\n" " -w # \tSize for incoming window [0]\n" " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n" " -e # \t# seconds to report statistics, 0 = end of test [0] *TBD*\n" " -R \tSend reply if 'reply-to' present\n" " -W # \t# outgoing window size [0]\n" " -F [,]* \tAddresses used for forwarding received messages\n" " -N \tSet the container name to \n" " -X \tPrint '\\n' to stdout after all subscriptions are created\n" " -V \tEnable debug logging\n" " SSL options:\n" " -T \tDatabase of trusted CA certificates for validating peer\n" " -C \tFile containing self-identifying certificate\n" " -K \tFile containing private key used to sign certificate\n" " -P [pass:|path] \tPassword to unlock private key file.\n" ); exit(rc); } static void parse_options( int argc, char **argv, Options_t *opts ) { int c; opterr = 0; memset( opts, 0, sizeof(*opts) ); opts->recv_count = -1; opts->timeout = -1; addresses_init(&opts->subscriptions); addresses_init(&opts->forwarding_targets); while ((c = getopt(argc, argv, "a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) { switch (c) { case 'a': addresses_merge( &opts->subscriptions, optarg ); break; case 'c': if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'b': if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'w': if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 't': if (sscanf( optarg, "%d", &opts->timeout ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } if (opts->timeout > 0) opts->timeout *= 1000; break; case 'e': if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'R': opts->reply = 1; break; case 'W': if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); usage(1); } break; case 'F': addresses_merge( &opts->forwarding_targets, optarg ); break; case 'V': enable_logging(); break; case 'N': opts->name = optarg; break; case 'X': opts->ready_text = optarg; break; case 'T': opts->ca_db = optarg; break; case 'C': opts->certificate = optarg; break; case 'K': opts->privatekey = optarg; break; case 'P': parse_password( optarg, &opts->password ); break; default: usage(1); } } // default subscription if none specified if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions, "amqp://~0.0.0.0" ); } int main(int argc, char** argv) { Options_t opts; Statistics_t stats; uint64_t sent = 0; uint64_t received = 0; int forwarding_index = 0; int rc; pn_message_t *message; pn_messenger_t *messenger; parse_options( argc, argv, &opts ); const int forward = opts.forwarding_targets.count != 0; message = pn_message(); messenger = pn_messenger( opts.name ); /* load the various command line options if they're set */ if (opts.certificate) { rc = pn_messenger_set_certificate(messenger, opts.certificate); check_messenger(messenger); check( rc == 0, "Failed to set certificate" ); } if (opts.privatekey) { rc = pn_messenger_set_private_key(messenger, opts.privatekey); check_messenger(messenger); check( rc == 0, "Failed to set private key" ); } if (opts.password) { rc = pn_messenger_set_password(messenger, opts.password); free(opts.password); check_messenger(messenger); check( rc == 0, "Failed to set password" ); } if (opts.ca_db) { rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db); check_messenger(messenger); check( rc == 0, "Failed to set trusted CA database" ); } if (opts.incoming_window) { // RAFI: seems to cause receiver to hang: pn_messenger_set_incoming_window( messenger, opts.incoming_window ); } pn_messenger_set_timeout( messenger, opts.timeout ); pn_messenger_start(messenger); check_messenger(messenger); int i; for (i = 0; i < opts.subscriptions.count; i++) { pn_messenger_subscribe(messenger, opts.subscriptions.addresses[i]); check_messenger(messenger); LOG("Subscribing to '%s'\n", opts.subscriptions.addresses[i]); } // hack to let test scripts know when the receivers are ready (so // that the senders may be started) if (opts.ready_text) { fprintf(stdout, "%s\n", opts.ready_text); fflush(stdout); } while (!opts.msg_count || received < opts.msg_count) { LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count); rc = pn_messenger_recv(messenger, opts.recv_count); check_messenger(messenger); check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed"); // start the timer only after receiving the first msg if (received == 0) statistics_start( &stats ); LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger)); while (pn_messenger_incoming(messenger)) { pn_messenger_get(messenger, message); check_messenger(messenger); received++; // TODO: header decoding? // uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong; statistics_msg_received( &stats, message ); if (opts.reply) { const char *reply_addr = pn_message_get_reply_to( message ); if (reply_addr) { LOG("Replying to: %s\n", reply_addr ); pn_message_set_address( message, reply_addr ); pn_message_set_creation_time( message, msgr_now() ); pn_messenger_put(messenger, message); sent++; } } if (forward) { const char *forward_addr = opts.forwarding_targets.addresses[forwarding_index]; forwarding_index = NEXT_ADDRESS(opts.forwarding_targets, forwarding_index); LOG("Forwarding to: %s\n", forward_addr ); pn_message_set_address( message, forward_addr ); pn_message_set_reply_to( message, NULL ); // else points to origin sender pn_message_set_creation_time( message, msgr_now() ); pn_messenger_put(messenger, message); sent++; } } LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent); } // this will flush any pending sends if (pn_messenger_outgoing(messenger) > 0) { LOG("Calling pn_messenger_send()\n"); rc = pn_messenger_send(messenger, -1); check_messenger(messenger); check(rc == 0, "pn_messenger_send() failed"); } rc = pn_messenger_stop(messenger); check(rc == 0, "pn_messenger_stop() failed"); check_messenger(messenger); statistics_report( &stats, sent, received ); pn_messenger_free(messenger); pn_message_free(message); addresses_free( &opts.subscriptions ); addresses_free( &opts.forwarding_targets ); return 0; } #undef PN_USE_DEPRECATED_API