/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include typedef struct app_data_t { const char *host, *port; const char *amqp_address; pn_proactor_t *proactor; pn_listener_t *listener; pn_raw_connection_t *towake; int connects; int disconnects; /* Sender values */ /* Receiver values */ } app_data_t; typedef struct connection_data_t { bool sender; } connection_data_t; static int exit_code = 0; /* Close the connection and the listener so so we will get a * PN_PROACTOR_INACTIVE event and exit, once all outstanding events * are processed. */ static void close_all(pn_raw_connection_t *c, app_data_t *app) { if (c) pn_raw_connection_close(c); if (app->listener) pn_listener_close(app->listener); } static int check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) { if (pn_condition_is_set(cond)) { fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), pn_condition_get_name(cond), pn_condition_get_description(cond)); close_all(pn_event_raw_connection(e), app); return 1; } return 0; } static void send_message(pn_raw_connection_t *c, const char* msg) { pn_raw_buffer_t buffer; uint32_t len = strlen(msg); char *buf = (char*) malloc(1024); memcpy(buf, msg, len); buffer.bytes = buf; buffer.capacity = 1024; buffer.offset = 0; buffer.size = len; pn_raw_connection_write_buffers(c, &buffer, 1); } static void recv_message(pn_raw_buffer_t buf) { fwrite(buf.bytes, buf.size, 1, stdout); } connection_data_t *make_receiver_data(void) { connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t)); cd->sender = false; return cd; } connection_data_t *make_sender_data(void) { connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t)); cd->sender = true; return cd; } #define READ_BUFFERS 4 /* This function handles events when we are acting as the receiver */ static void handle_receive(app_data_t *app, pn_event_t* event) { switch (pn_event_type(event)) { case PN_RAW_CONNECTION_CONNECTED: { pn_raw_connection_t *c = pn_event_raw_connection(event); pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}}; int i = READ_BUFFERS; for (; i; --i) { pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i]; buff->bytes = (char*) malloc(1024); buff->capacity = 1024; buff->size = 0; buff->offset = 0; } pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS); } break; case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { } break; default: break; } } #define WRITE_BUFFERS 4 static void free_buffers(pn_raw_buffer_t buffs[], size_t n) { unsigned i; for (i=0; iconnects++; } break; case PN_RAW_CONNECTION_DISCONNECTED: { pn_raw_connection_t *c = pn_event_raw_connection(event); connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c); pn_raw_connection_set_context(c, NULL); free(cd); printf("**raw connection disconnected\n"); pn_proactor_cancel_timeout(app->proactor); app->disconnects++; check_condition(event, pn_raw_connection_condition(c), app); } break; case PN_RAW_CONNECTION_DRAIN_BUFFERS: { pn_raw_connection_t *c = pn_event_raw_connection(event); pn_raw_buffer_t buffs[READ_BUFFERS]; size_t n; while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) { free_buffers(buffs, n); } while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) { free_buffers(buffs, n); } } case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { pn_raw_connection_t *c = pn_event_raw_connection(event); char line[120]; if (fgets(line, sizeof(line), stdin)) { send_message(c, line); } else { /* On end of file, close for write then wait 2 sec for response */ pn_raw_connection_write_close(c); app->towake = c; pn_proactor_set_timeout(app->proactor, 2000); } } break; /* This path handles received bytes */ case PN_RAW_CONNECTION_READ: { pn_raw_connection_t *c = pn_event_raw_connection(event); pn_raw_buffer_t buffs[READ_BUFFERS]; size_t n; while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) { unsigned i; for (i=0; itowake) { pn_raw_connection_wake(app->towake); app->towake = NULL; } } break; case PN_PROACTOR_INACTIVE: { return false; } break; default: { pn_raw_connection_t *c = pn_event_raw_connection(event); if (c) { connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c); if (cd && cd->sender) { handle_send(app, event); } else { handle_receive(app, event); } } } } return exit_code == 0; } void run(app_data_t *app) { /* Loop and handle events */ do { pn_event_batch_t *events = pn_proactor_wait(app->proactor); pn_event_t *e; for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { if (!handle(app, e)) { return; } } pn_proactor_done(app->proactor, events); } while(true); } int main(int argc, char **argv) { struct app_data_t app = {0}; char addr[PN_MAX_ADDR]; pn_raw_connection_t *c = pn_raw_connection(); connection_data_t *cd = make_sender_data(); app.host = (argc > 1) ? argv[1] : ""; app.port = (argc > 2) ? argv[2] : "amqp"; /* Create the proactor and connect */ app.proactor = pn_proactor(); pn_raw_connection_set_context(c, cd); pn_proactor_addr(addr, sizeof(addr), app.host, app.port); pn_proactor_raw_connect(app.proactor, c, addr); run(&app); pn_proactor_free(app.proactor); return exit_code; }