/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include "proton/connection_driver.h" #include "proton/engine.h" #include "proton/log.h" #include "proton/message.h" #include "proton/transport.h" #include #include #include #include // variant of the receive.c proactor example // borrowed things from the quiver arrow, too #define MAX_SIZE 1024 typedef char str[MAX_SIZE]; typedef struct app_data_t { str container_id; pn_rwbytes_t message_buffer{}; int message_count; int received = 0; pn_message_t *message; int sent = 0; pn_rwbytes_t msgout; int credit_window = 5000; int acknowledged = 0; int closed = 0; } app_data_t; static void decode_message(pn_delivery_t *dlv); static void check_condition(pn_event_t *e, pn_condition_t *cond); static void shovel(pn_connection_driver_t &sender, pn_connection_driver_t &receiver); static void handle_receiver(app_data_t *app, pn_event_t *event); static void handle_sender(app_data_t *app, pn_event_t *event); const bool VERBOSE = false; const bool ERRORS = true; // useful for debugging static void turn_on_transport_logging(pn_transport_t& transport) { pn_log_enable(true); pn_transport_trace(&transport, PN_TRACE_FRM | PN_TRACE_EVT); } static void BM_InitCloseSender(benchmark::State &state) { if (VERBOSE) printf("BEGIN BM_InitCloseSender\n"); for (auto _ : state) { pn_connection_driver_t sender; if (pn_connection_driver_init(&sender, NULL, NULL) != 0) { printf("sender: pn_connection_driver_init failed\n"); exit(1); } pn_connection_driver_close(&sender); pn_connection_driver_destroy(&sender); } if (VERBOSE) printf("END BM_InitCloseSender\n"); } BENCHMARK(BM_InitCloseSender)->Unit(benchmark::kMicrosecond); static void BM_InitCloseReceiver(benchmark::State &state) { if (VERBOSE) printf("BEGIN BM_InitCloseReceiver\n"); for (auto _ : state) { pn_connection_driver_t receiver; if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) { printf("receiver: pn_connection_driver_init failed\n"); exit(1); } pn_connection_driver_close(&receiver); pn_connection_driver_destroy(&receiver); } if (VERBOSE) printf("END BM_InitCloseReceiver\n"); } BENCHMARK(BM_InitCloseReceiver)->Unit(benchmark::kMicrosecond); // sends single message and closes the connection static void BM_EstablishConnection(benchmark::State &state) { if (VERBOSE) printf("BEGIN BM_EstablishConnection\n"); for (auto _ : state) { app_data_t app = {}; app.message_count = 1; app.message = pn_message(); sprintf(app.container_id, "%s:%06x", "BM_EstablishConnection", rand() & 0xffffff); pn_connection_driver_t receiver; if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) { printf("receiver: pn_connection_driver_init failed\n"); exit(1); } pn_connection_driver_t sender; if (pn_connection_driver_init(&sender, NULL, NULL) != 0) { printf("sender: pn_connection_driver_init failed\n"); exit(1); } do { pn_event_t *event; while ((event = pn_connection_driver_next_event(&sender)) != NULL) { handle_sender(&app, event); } shovel(sender, receiver); while ((event = pn_connection_driver_next_event(&receiver)) != NULL) { handle_receiver(&app, event); } shovel(receiver, sender); } while (app.closed < 2); // until both sender and receiver are closed pn_message_free(app.message); pn_connection_driver_close(&receiver); pn_connection_driver_close(&sender); shovel(receiver, sender); shovel(sender, receiver); // this can take long time, up to 500 ms pn_connection_driver_destroy(&receiver); pn_connection_driver_destroy(&sender); } state.SetLabel("connections"); state.SetItemsProcessed(state.iterations()); if (VERBOSE) printf("END BM_EstablishConnection\n"); } BENCHMARK(BM_EstablishConnection)->Unit(benchmark::kMicrosecond); static void BM_SendReceiveMessages(benchmark::State &state) { if (VERBOSE) printf("BEGIN BM_SendReceiveMessages\n"); app_data_t app = {}; app.message_count = -1; // unlimited app.message = pn_message(); app.credit_window = state.range(0); sprintf(app.container_id, "%s:%06x", "BM_SendReceiveMessages", rand() & 0xffffff); pn_connection_driver_t receiver; if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) { printf("receiver: pn_connection_driver_init failed\n"); exit(1); } pn_connection_driver_t sender; if (pn_connection_driver_init(&sender, NULL, NULL) != 0) { printf("sender: pn_connection_driver_init failed\n"); exit(1); } for (auto _ : state) { pn_event_t *event; while ((event = pn_connection_driver_next_event(&sender)) != NULL) { handle_sender(&app, event); } shovel(sender, receiver); while ((event = pn_connection_driver_next_event(&receiver)) != NULL) { handle_receiver(&app, event); } shovel(receiver, sender); } pn_connection_driver_close(&receiver); pn_connection_driver_close(&sender); shovel(receiver, sender); shovel(sender, receiver); // this can take long time, up to 500 ms pn_connection_driver_destroy(&receiver); pn_connection_driver_destroy(&sender); state.SetLabel("messages"); state.SetItemsProcessed(app.acknowledged); if (VERBOSE) printf("END BM_SendReceiveMessages\n"); } BENCHMARK(BM_SendReceiveMessages) ->RangeMultiplier(3) ->Range(1, 200000) ->ArgName("creditWindow") ->Arg(1000) ->Unit(benchmark::kMillisecond); ///* Create a message with a map { "sequence" : number } encode it and return /// the encoded buffer. */ static void send_message(app_data_t *app, pn_link_t *sender) { /* Construct a message with the map { "sequence": 42 } */ pn_data_t *body; pn_message_clear(app->message); body = pn_message_body(app->message); pn_data_put_int(pn_message_id(app->message), app->sent); /* Set the message_id also */ pn_data_put_map(body); pn_data_enter(body); pn_data_put_string(body, pn_bytes(sizeof("sequence") - 1, "sequence")); pn_data_put_int(body, 42); /* The sequence number */ pn_data_exit(body); if (pn_message_send(app->message, sender, &app->message_buffer) < 0) { fprintf(stderr, "error sending message: %s\n", pn_error_text(pn_message_error(app->message))); exit(1); } } static void handle_sender(app_data_t *app, pn_event_t *event) { switch (pn_event_type(event)) { case PN_CONNECTION_INIT: { pn_connection_t *c = pn_event_connection(event); pn_connection_set_container(c, "sendercid"); pn_connection_open(c); pn_session_t *s = pn_session(c); pn_session_open(s); pn_link_t *l = pn_sender(s, "my_sender"); pn_terminus_set_address(pn_link_target(l), "example"); pn_link_set_snd_settle_mode(l, PN_SND_UNSETTLED); pn_link_set_rcv_settle_mode(l, PN_RCV_FIRST); pn_link_open(l); } break; case PN_LINK_FLOW: { if (VERBOSE) printf("BEGIN handle_sender: PN_LINK_FLOW\n"); /* The peer has given us some credit, now we can send messages */ pn_link_t *sender = pn_event_link(event); while (pn_link_credit(sender) > 0 && app->sent != app->message_count) { ++app->sent; /* Use sent counter as unique delivery tag. */ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); send_message(app, sender); } break; } case PN_DELIVERY: { /* We received acknowledgement from the peer that a message was delivered. */ pn_delivery_t *d = pn_event_delivery(event); if (pn_delivery_remote_state(d) == PN_ACCEPTED) { if (VERBOSE) printf("got PN_ACCEPTED\n"); if (++app->acknowledged == app->message_count) { if (VERBOSE) printf("%d messages sent and acknowledged\n", app->acknowledged); pn_connection_close(pn_event_connection(event)); /* Continue handling events till we receive TRANSPORT_CLOSED */ } } else { fprintf(stderr, "unexpected delivery state %d\n", (int) pn_delivery_remote_state(d)); pn_connection_close(pn_event_connection(event)); exit(EXIT_FAILURE); } break; } case PN_CONNECTION_REMOTE_OPEN: pn_connection_open(pn_event_connection(event)); /* Complete the open */ break; case PN_SESSION_REMOTE_OPEN: pn_session_open(pn_event_session(event)); break; case PN_TRANSPORT_ERROR: check_condition(event, pn_transport_condition(pn_event_transport(event))); pn_connection_close(pn_event_connection(event)); break; case PN_CONNECTION_REMOTE_CLOSE: check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); pn_connection_close(pn_event_connection(event)); break; case PN_SESSION_REMOTE_CLOSE: check_condition(event, pn_session_remote_condition(pn_event_session(event))); pn_connection_close(pn_event_connection(event)); break; case PN_LINK_REMOTE_CLOSE: case PN_LINK_REMOTE_DETACH: check_condition(event, pn_link_remote_condition(pn_event_link(event))); pn_connection_close(pn_event_connection(event)); break; case PN_TRANSPORT_CLOSED: app->closed++; break; default: break; } } static void handle_receiver(app_data_t *app, pn_event_t *event) { switch (pn_event_type(event)) { case PN_CONNECTION_INIT: { pn_connection_t *c = pn_event_connection(event); pn_connection_set_container(c, app->container_id); // pn_connection_open(c); pn_session_t *s = pn_session(c); pn_session_open(s); pn_link_t *l = pn_receiver(s, "my_receiver"); pn_terminus_set_address(pn_link_source(l), "example"); pn_link_open(l); } break; case PN_LINK_REMOTE_OPEN: { pn_link_t *l = pn_event_link(event); pn_terminus_t *t = pn_link_target(l); pn_terminus_t *rt = pn_link_remote_target(l); pn_terminus_set_address(t, pn_terminus_get_address(rt)); pn_link_open(l); if (pn_link_is_receiver(l)) { pn_link_flow(l, app->credit_window); } break; } case PN_DELIVERY: { /* A message has been received */ pn_link_t *link = NULL; pn_delivery_t *dlv = pn_event_delivery(event); if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { link = pn_delivery_link(dlv); decode_message(dlv); /* Accept the delivery */ pn_delivery_update(dlv, PN_ACCEPTED); /* done with the delivery, move to the next and free it */ pn_link_advance(link); pn_delivery_settle(dlv); /* dlv is now freed */ } pn_link_flow(link, app->credit_window - pn_link_credit(link)); } break; case PN_CONNECTION_REMOTE_OPEN: pn_connection_open(pn_event_connection(event)); /* Complete the open */ break; case PN_SESSION_REMOTE_OPEN: pn_session_open(pn_event_session(event)); break; case PN_TRANSPORT_ERROR: check_condition(event, pn_transport_condition(pn_event_transport(event))); pn_connection_close(pn_event_connection(event)); break; case PN_CONNECTION_REMOTE_CLOSE: check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); pn_connection_close(pn_event_connection(event)); break; case PN_SESSION_REMOTE_CLOSE: check_condition(event, pn_session_remote_condition(pn_event_session(event))); pn_connection_close(pn_event_connection(event)); break; case PN_LINK_REMOTE_CLOSE: case PN_LINK_REMOTE_DETACH: check_condition(event, pn_link_remote_condition(pn_event_link(event))); pn_connection_close(pn_event_connection(event)); break; case PN_TRANSPORT_CLOSED: app->closed++; break; default: break; } } static void check_condition(pn_event_t *e, pn_condition_t *cond) { if (VERBOSE) printf("beginning check_condition\n"); if (pn_condition_is_set(cond)) { if (VERBOSE || ERRORS) fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), pn_condition_get_name(cond), pn_condition_get_description(cond)); } } ///* Copies output from first connection driver into input of the second. */ static void shovel(pn_connection_driver_t &sender, pn_connection_driver_t &receiver) { pn_bytes_t wbuf = pn_connection_driver_write_buffer(&receiver); if (wbuf.size == 0) { pn_connection_driver_write_done(&receiver, 0); return; } pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&sender); if (rbuf.start == NULL) { printf("shovel: rbuf.start is null\n"); fflush(stdout); fflush(stderr); exit(1); } size_t s = rbuf.size < wbuf.size ? rbuf.size : wbuf.size; memcpy(rbuf.start, wbuf.start, s); pn_connection_driver_read_done(&sender, s); pn_connection_driver_write_done(&receiver, s); } static void decode_message(pn_delivery_t *dlv) { static char buffer[MAX_SIZE]; ssize_t len; // try to decode the message body if (pn_delivery_pending(dlv) < MAX_SIZE) { // read in the raw data len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); if (len > 0) { // decode it into a proton message pn_message_t *m = pn_message(); if (PN_OK == pn_message_decode(m, buffer, len)) { pn_string_t *s = pn_string(NULL); pn_inspect(pn_message_body(m), s); if (VERBOSE) printf("%s\n", pn_string_get(s)); pn_free(s); } pn_message_free(m); } } }