// // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include <ctype.h> #include <errno.h> #include <stdarg.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "nng.h" #include "protocol/bus0/bus.h" #include "protocol/pair0/pair.h" #include "protocol/pair1/pair.h" #include "protocol/pipeline0/pull.h" #include "protocol/pipeline0/push.h" #include "protocol/pubsub0/pub.h" #include "protocol/pubsub0/sub.h" #include "protocol/reqrep0/rep.h" #include "protocol/reqrep0/req.h" #include "protocol/survey0/respond.h" #include "protocol/survey0/survey.h" #include "supplemental/tls/tls.h" #include "supplemental/util/options.h" #include "supplemental/util/platform.h" #include "transport/zerotier/zerotier.h" // Globals. We need this to avoid passing around everything. int format = 0; int proto = 0; int verbose = 0; int delay = 0; nng_duration interval = NNG_DURATION_INFINITE; nng_duration sendtimeo = NNG_DURATION_INFINITE; nng_duration recvtimeo = NNG_DURATION_INFINITE; void * data = NULL; size_t datalen = 0; int compat = 0; int async = 0; int insecure = 0; void * cacert = NULL; size_t cacertlen = 0; void * keyfile = NULL; size_t keylen = 0; void * certfile = NULL; size_t certlen = 0; const char * zthome = NULL; int count = 0; int recvmaxsz = -1; // Options, must start at 1 because zero is sentinel. enum options { OPT_HELP = 1, OPT_VERBOSE, OPT_SILENT, OPT_REP0, OPT_REQ0, OPT_PUSH0, OPT_PULL0, OPT_PUB0, OPT_SUB0, OPT_SURVEY0, OPT_RESPOND0, OPT_PAIR0, OPT_PAIR1, OPT_PAIR, OPT_BUS0, OPT_DIAL, OPT_LISTEN, OPT_DATA, OPT_FILE, OPT_SUBSCRIBE, OPT_INTERVAL, OPT_DELAY, OPT_COUNT, OPT_FORMAT, OPT_RAW, OPT_ASCII, OPT_QUOTED, OPT_MSGPACK, OPT_HEX, OPT_BLANK, // no printing, not an actual option. OPT_COMPAT, OPT_ASYNC, OPT_RCV_TIMEO, OPT_SND_TIMEO, OPT_SOCK_NAME, OPT_LISTEN_IPC, OPT_DIAL_IPC, OPT_LISTEN_LOCAL, OPT_DIAL_LOCAL, OPT_INSECURE, OPT_CACERT, OPT_KEYFILE, OPT_CERTFILE, OPT_VERSION, OPT_RECVMAXSZ, OPT_ZTHOME, }; static nng_optspec opts[] = { { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP }, { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE }, { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT }, { .o_name = "req0", .o_val = OPT_REQ0 }, { .o_name = "rep0", .o_val = OPT_REP0 }, { .o_name = "push0", .o_val = OPT_PUSH0 }, { .o_name = "pull0", .o_val = OPT_PULL0 }, { .o_name = "pub0", .o_val = OPT_PUB0 }, { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias { .o_name = "sub0", .o_val = OPT_SUB0 }, { .o_name = "surveyor0", .o_val = OPT_SURVEY0 }, { .o_name = "respondent0", .o_val = OPT_RESPOND0 }, { .o_name = "pair", .o_val = OPT_PAIR }, { .o_name = "pair0", .o_val = OPT_PAIR0 }, { .o_name = "pair1", .o_val = OPT_PAIR1 }, { .o_name = "bus0", .o_val = OPT_BUS0 }, { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true }, { .o_name = "listen", .o_val = OPT_LISTEN, .o_arg = true }, { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true }, { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true }, { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true }, { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true }, { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII }, { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED }, { .o_name = "raw", .o_val = OPT_RAW }, { .o_name = "hex", .o_val = OPT_HEX }, { .o_name = "compat", .o_val = OPT_COMPAT }, { .o_name = "async", .o_val = OPT_ASYNC }, { .o_name = "recv-maxsz", .o_short = 'Z', .o_val = OPT_RECVMAXSZ, .o_arg = true, }, { .o_name = "count", .o_short = 'C', .o_val = OPT_COUNT, .o_arg = true, }, { .o_name = "delay", .o_short = 'd', .o_val = OPT_DELAY, .o_arg = true, }, { .o_name = "interval", .o_short = 'i', .o_val = OPT_INTERVAL, .o_arg = true, }, { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true }, { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true }, { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true }, // Legacy compatibility with nanocat. { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true }, { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true }, { .o_name = "bind-ipc", .o_short = 'X', .o_val = OPT_LISTEN_IPC, .o_arg = true, }, { .o_name = "connect-ipc", .o_short = 'x', .o_val = OPT_DIAL_IPC, .o_arg = true, }, { .o_name = "bind-local", .o_short = 'L', .o_val = OPT_LISTEN_LOCAL, .o_arg = true, }, { .o_name = "connect-local", .o_short = 'l', .o_val = OPT_DIAL_LOCAL, .o_arg = true, }, { .o_name = "insecure", .o_short = 'k', .o_val = OPT_INSECURE }, { .o_name = "cacert", .o_val = OPT_CACERT, .o_arg = true }, { .o_name = "key", .o_val = OPT_KEYFILE, .o_arg = true }, { .o_name = "cert", .o_short = 'E', .o_val = OPT_CERTFILE, .o_arg = true, }, { .o_name = "zt-home", .o_val = OPT_ZTHOME, .o_arg = true, }, { .o_name = "version", .o_short = 'V', .o_val = OPT_VERSION }, // Sentinel. { .o_name = NULL, .o_val = 0 }, }; static void fatal(const char *msg, ...) { va_list ap; va_start(ap, msg); vfprintf(stderr, msg, ap); va_end(ap); fprintf(stderr, "\n"); exit(1); } static void help(void) { printf( "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n"); printf("<proto> must be one of:\n"); printf(" --req0 (or alias --req)\n"); printf(" --rep0 (or alias --rep)\n"); printf(" --pub0 (or alias --pub)\n"); printf(" --sub0 (or alias --sub)\n"); printf(" --push0 (or alias --push)\n"); printf(" --pull0 (or alias --pull)\n"); printf(" --surveyor0 (or alias --surveyor)\n"); printf(" --respondent0 (or alias --respondent)\n"); printf(" --pair0\n"); printf(" --pair1\n"); printf(" --pair (alias for either pair0 or pair1)\n"); printf("\n<addr> must be one or more of:\n"); printf(" --dial <url> (or alias --connect <url>)\n"); printf(" --listen <url> (or alias --bind <url>)\n"); printf(" --bind-local <port> (or alias -L <port>)\n"); printf(" --connect-local <port> (or alias -l <port>)\n"); printf(" --bind-ipc <path> (or alias -X <path>)\n"); printf(" --connect-ipc <path> (or alias -x <path>)\n"); printf("\n<fmt> may be one or none of:\n"); printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n"); printf(" --ascii (or alias -A)\n"); printf(" --quoted (or alias -Q)\n"); printf(" --hex\n"); printf(" --msgpack\n"); printf(" --raw\n"); printf("\n<opts> may be any of:\n"); printf(" --subscribe <topic> (only with --sub protocol)\n"); printf(" --silent (or alias -q)\n"); printf(" --verbose (or alias -v)\n"); printf(" --count <num> (or alias -C <num>)\n"); printf(" --delay <secs> (or alias -d <secs>)\n"); printf(" --interval <secs> (or alias -i <secs>)\n"); printf(" --recv-timeout <secs>\n"); printf(" --send-timeout <secs>\n"); printf(" --recv-maxsz <size> (or alias -Z <size>)\n"); printf(" --compat\n"); printf(" --async\n"); printf(" --insecure (or alias -k)\n"); printf(" --cacert <file>\n"); printf(" --cert <file> (or alias -E <file>)\n"); printf(" --key <file>\n"); printf(" --zt-home <path>\n"); printf("\n<src> may be one of:\n"); printf(" --file <file> (or alias -F <file>)\n"); printf(" --data <data> (or alias -D <data>)\n"); exit(1); } static int intarg(const char *val, int maxv) { int v = 0; if (val[0] == '\0') { fatal("Empty integer argument."); } while (*val != '\0') { if (!isdigit(*val)) { fatal("Integer argument expected."); } v *= 10; v += ((*val) - '0'); val++; if (v > maxv) { fatal("Integer argument too large."); } } if (v < 0) { fatal("Integer argument overflow."); } return (v); } // This reads a file into memory. Care is taken to ensure that // the buffer is one byte larger and contains a terminating // NUL. (Useful for key files and such.) static void loadfile(const char *path, void **datap, size_t *lenp) { FILE * f; char * data; size_t len; if ((f = fopen(path, "r")) == NULL) { fatal("Cannot open file %s: %s", path, strerror(errno)); } if (fseek(f, 0, SEEK_END) != 0) { fatal("Cannot seek to end of file: %s", strerror(errno)); } len = ftell(f); (void) fseek(f, 0, SEEK_SET); if ((data = malloc(len + 1)) == NULL) { fatal("Out of memory."); } data[len] = '\0'; if (fread(data, 1, len, f) != len) { fatal("Read file %s failed: %s", path, strerror(errno)); } fclose(f); *datap = data; *lenp = len; } static void configtls(nng_tls_config *tls) { int rv = 0; if (insecure) { rv = nng_tls_config_auth_mode(tls, NNG_TLS_AUTH_MODE_NONE); } if ((rv == 0) && (certfile != NULL)) { keyfile = keyfile ? keyfile : certfile; rv = nng_tls_config_own_cert(tls, certfile, keyfile, NULL); } if ((rv == 0) && (cacert != NULL)) { rv = nng_tls_config_ca_chain(tls, cacert, NULL); } if (rv != 0) { fatal("Unable to configure TLS: %s", nng_strerror(rv)); } } struct addr { struct addr *next; int mode; char * val; }; struct addr ** addaddr(struct addr **endp, int mode, const char *a) { struct addr *na; if (((na = malloc(sizeof(*na))) == NULL) || ((na->val = malloc(strlen(a) + 1)) == NULL)) { fatal("Out of memory."); } na->mode = mode; memcpy(na->val, a, strlen(a) + 1); na->next = NULL; *endp = na; return (&na->next); } struct topic { struct topic *next; char * val; }; struct topic ** addtopic(struct topic **endp, const char *s) { struct topic *t; if (((t = malloc(sizeof(*t))) == NULL) || ((t->val = malloc(strlen(s) + 1)) == NULL)) { fatal("Out of memory."); } memcpy(t->val, s, strlen(s) + 1); t->next = NULL; *endp = t; return (&t->next); } void printmsg(char *buf, size_t len) { switch (format) { case OPT_BLANK: // Suppress contents. return; case OPT_RAW: // Just emit raw contents. fwrite(buf, 1, len, stdout); break; case OPT_ASCII: // ASCII, but non-ASCII substituted with '.' for (size_t i = 0; i < len; i++) { if (isprint(buf[i])) { putchar(buf[i]); } else { putchar('.'); } } break; case OPT_QUOTED: // C style quoted strings. putchar('"'); for (size_t i = 0; i < len; i++) { switch (buf[i]) { case '\n': putchar('\\'); putchar('n'); break; case '\r': putchar('\\'); putchar('r'); break; case '\t': putchar('\\'); putchar('t'); break; case '"': case '\\': putchar('\\'); putchar(buf[i]); break; default: if (isprint(buf[i])) { fputc(buf[i], stdout); } else { printf("\\x%02x", (uint8_t) buf[i]); } } } putchar('"'); putchar('\n'); break; case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then // raw. if (len < 256) { putchar('\xc4'); putchar(len & 0xff); } else if (len < 65536) { putchar('\xc5'); putchar((len >> 8) & 0xff); putchar(len & 0xff); } else { putchar('\xc6'); putchar((len >> 24) & 0xff); putchar((len >> 16) & 0xff); putchar((len >> 8) & 0xff); putchar(len & 0xff); } fwrite(buf, 1, len, stdout); break; case OPT_HEX: // Hex, quoted C string encoded with hex // literals. putchar('"'); for (size_t i = 0; i < len; i++) { printf("\\x%02x", (uint8_t) buf[i]); } putchar('"'); putchar('\n'); break; } fflush(stdout); } void recvloop(nng_socket sock) { int iters = 0; for (;;) { int rv; nng_msg *msg; if ((count > 0) && (iters >= count)) { break; } rv = nng_recvmsg(sock, &msg, 0); iters++; switch (rv) { case NNG_ETIMEDOUT: case NNG_ESTATE: // Either a regular timeout, or we reached the // end of an event like a survey completing. return; case 0: printmsg(nng_msg_body(msg), nng_msg_len(msg)); nng_msg_free(msg); continue; default: fatal("Receive error: %s", nng_strerror(rv)); } } } void resploop(nng_socket sock) { int iters = 0; for (;;) { int rv; nng_msg *msg; rv = nng_recvmsg(sock, &msg, 0); if (rv != 0) { fatal("Receive error: %s", nng_strerror(rv)); } printmsg(nng_msg_body(msg), nng_msg_len(msg)); nng_msg_clear(msg); if ((rv = nng_msg_append(msg, data, datalen)) != 0) { fatal(nng_strerror(rv)); } if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { fatal("Send error: %s", nng_strerror(rv)); } iters++; if ((count > 0) && (iters >= count)) { break; } } nng_msleep(200); } void sendloop(nng_socket sock) { int iters = 0; if (data == NULL) { fatal("No data to send (specify with --data or --file)"); } if (delay > 0) { nng_msleep(delay); } for (;;) { int rv; nng_msg * msg; nng_time start; nng_time end; nng_duration delta; start = nng_clock(); if (((rv = nng_msg_alloc(&msg, 0)) != 0) || ((rv = nng_msg_append(msg, data, datalen)) != 0)) { fatal(nng_strerror(rv)); } if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { fatal("Send error: %s", nng_strerror(rv)); } end = nng_clock(); delta = (nng_duration)(end - start); iters++; // By default, we don't loop. if (((interval < 0) && (count == 0)) || ((count > 0) && (iters >= count))) { break; } // We sleep, but we account for time spent, so that our // interval appears more or less constant. Of course // if we took more than the interval here, then we skip // the sleep altogether. if ((delta >= 0) && (delta < interval)) { nng_msleep(interval - delta); } } // Wait a bit to give queues a chance to drain. nng_msleep(200); } void sendrecv(nng_socket sock) { int iters = 0; if (data == NULL) { fatal("No data to send (specify with --data or --file)"); } if (delay > 0) { nng_msleep(delay); } // We start by sending a message, then we receive iteratively // but we set a concrete timeout if interval is set, to ensure // that we exit the receive loop, and can continue. for (;;) { int rv; nng_msg * msg; nng_time start; nng_time end; nng_duration delta; start = nng_clock(); if (((rv = nng_msg_alloc(&msg, 0)) != 0) || ((rv = nng_msg_append(msg, data, datalen)) != 0)) { fatal(nng_strerror(rv)); } if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { fatal("Send error: %s", nng_strerror(rv)); } if ((interval < 0) && (count == 0)) { // Only one iteration through. recvloop(sock); break; } // We would like to use recvloop, but we need to reset // our timeout each time, as the timer counts down // towards zero. Furthermore, with survey, we don't // want to increment the iteration count. for (;;) { delta = (nng_duration)(nng_clock() - start); nng_duration expire = interval - delta; if ((recvtimeo >= 0) && (expire > recvtimeo)) { expire = recvtimeo; } rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, expire); if (rv != 0) { fatal("Cannot set recv timeout: %s", nng_strerror(rv)); } rv = nng_recvmsg(sock, &msg, 0); switch (rv) { case 0: printmsg(nng_msg_body(msg), nng_msg_len(msg)); nng_msg_free(msg); continue; case NNG_ETIMEDOUT: case NNG_ESTATE: // We're done receiving break; default: fatal("Cannot receive: %s", nng_strerror(rv)); break; } // We're done receiving, break out. break; } end = nng_clock(); delta = (nng_duration)(end - start); iters++; if ((count > 0) && (iters >= count)) { break; } // We sleep, but we account for time spent, so that our // interval appears more or less constant. Of course // if we took more than the interval here, then we skip // the sleep altogether. if ((delta >= 0) && (delta < interval)) { nng_msleep(interval - delta); } } } int main(int ac, char **av) { int idx; char * arg; int val; int rv; char scratch[512]; struct addr * addrs = NULL; struct addr ** addrend; struct topic * topics = NULL; struct topic **topicend; nng_socket sock; int port; idx = 1; addrend = &addrs; topicend = &topics; while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) { switch (val) { case OPT_HELP: help(); break; case OPT_REQ0: case OPT_REP0: case OPT_SUB0: case OPT_PUB0: case OPT_BUS0: case OPT_SURVEY0: case OPT_RESPOND0: case OPT_PAIR0: case OPT_PAIR1: case OPT_PUSH0: case OPT_PULL0: if (proto != 0) { fatal("Only one protocol may be " "specified."); } proto = val; break; case OPT_DIAL: case OPT_LISTEN: addrend = addaddr(addrend, val, arg); break; case OPT_DIAL_LOCAL: case OPT_LISTEN_LOCAL: port = intarg(arg, 65536); snprintf(scratch, sizeof(scratch), "tcp://127.0.0.1:%d", port); addrend = addaddr(addrend, val, scratch); break; case OPT_DIAL_IPC: case OPT_LISTEN_IPC: snprintf(scratch, sizeof(scratch), "ipc:///%s", arg); addrend = addaddr(addrend, val, scratch); break; case OPT_COUNT: count = intarg(arg, 0x7fffffff); break; case OPT_SUBSCRIBE: topicend = addtopic(topicend, arg); break; case OPT_VERBOSE: case OPT_SILENT: verbose = val; break; case OPT_DELAY: delay = intarg(arg, 86400) * 1000; // max 1 day break; case OPT_INTERVAL: interval = intarg(arg, 86400) * 1000; // max 1 day break; case OPT_SND_TIMEO: sendtimeo = intarg(arg, 86400) * 1000; // max 1 day break; case OPT_RCV_TIMEO: recvtimeo = intarg(arg, 86400) * 1000; // max 1 day break; case OPT_RECVMAXSZ: recvmaxsz = intarg(arg, 0x7fffffff); if (recvmaxsz == 0) { recvmaxsz = 0x7fffffff; } break; case OPT_COMPAT: compat = 1; break; case OPT_ASYNC: async = NNG_FLAG_NONBLOCK; break; case OPT_ASCII: case OPT_RAW: case OPT_QUOTED: case OPT_MSGPACK: case OPT_HEX: if (format != 0) { fatal("Format may be specified only " "once."); } format = val; break; case OPT_FORMAT: if (format != 0) { fatal("Format may be specified only " "once."); } if (strcmp(arg, "no") == 0) { format = OPT_BLANK; } else if (strcmp(arg, "ascii") == 0) { format = OPT_ASCII; } else if (strcmp(arg, "hex") == 0) { format = OPT_HEX; } else if (strcmp(arg, "quoted") == 0) { format = OPT_QUOTED; } else if (strcmp(arg, "raw") == 0) { format = OPT_RAW; } else if (strcmp(arg, "msgpack") == 0) { format = OPT_MSGPACK; } else { fatal("Invalid format specified."); } break; case OPT_FILE: if (data != NULL) { fatal("Data (--file, --data) may be " "specified " "only once."); } loadfile(arg, &data, &datalen); break; case OPT_DATA: if (data != NULL) { fatal("Data (--file, --data) may be " "specified " "only once."); } if ((data = malloc(strlen(arg) + 1)) == NULL) { fatal("Out of memory."); } memcpy(data, arg, strlen(arg) + 1); datalen = strlen(arg); break; case OPT_CACERT: if (cacert != NULL) { fatal("CA Certificate (--cacert) may be " "specified only once."); } loadfile(arg, &cacert, &cacertlen); break; case OPT_KEYFILE: if (keyfile != NULL) { fatal( "Key (--key) may be specified only once."); } loadfile(arg, &keyfile, &keylen); break; case OPT_CERTFILE: if (certfile != NULL) { fatal("Cert (--cert) may be specified " "only " "once."); } loadfile(arg, &certfile, &certlen); break; case OPT_ZTHOME: zthome = arg; break; case OPT_INSECURE: insecure = 1; break; case OPT_VERSION: printf("%s\n", nng_version()); exit(0); } } switch (rv) { case NNG_EINVAL: fatal("Option %s is invalid.", av[idx]); break; case NNG_EAMBIGUOUS: fatal("Option %s is ambiguous (specify in full).", av[idx]); break; case NNG_ENOARG: fatal("Option %s requires argument.", av[idx]); break; } if (addrs == NULL) { fatal("No address specified."); } if (compat) { if (async != 0) { fatal("Options --async and --compat are " "incompatible."); } if (count != 0) { fatal("Options --count and --compat are " "incompatible."); } if (proto == OPT_PAIR) { proto = OPT_PAIR0; } if (proto == OPT_PAIR1) { fatal("Protocol does not support --compat."); } async = NNG_FLAG_NONBLOCK; } else { if (proto == OPT_PAIR) { proto = OPT_PAIR1; } } if (proto == OPT_SUB0) { if (topics == NULL) { topicend = addtopic(topicend, ""); // subscribe to all } } else { if (topics != NULL) { fatal("Protocol does not support --subscribe."); } } switch (proto) { case OPT_PULL0: case OPT_SUB0: if (data != NULL) { fatal("Protocol does not support --file or " "--data."); } if (interval >= 0) { fatal("Protocol does not support --interval."); } break; case OPT_PUSH0: case OPT_PUB0: if (format != 0) { fatal("Protocol does not support --format " "options."); } if (data == NULL) { fatal("Protocol requires either --file or " "--data."); } break; case OPT_SURVEY0: case OPT_REQ0: if (data == NULL) { fatal("Protocol requires either --file or " "--data."); } break; case OPT_REP0: case OPT_RESPOND0: if (interval >= 0) { fatal("Protocol does not support --interval."); } break; case OPT_PAIR0: case OPT_PAIR1: case OPT_BUS0: if ((interval >= 0) && (data == NULL)) { fatal("Option --interval requires either " "--file or --data."); } break; } switch (proto) { case OPT_REQ0: #ifdef NNG_HAVE_REQ0 rv = nng_req0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_REP0: #ifdef NNG_HAVE_REP0 rv = nng_rep0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_SUB0: #ifdef NNG_HAVE_SUB0 rv = nng_sub0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_PUB0: #ifdef NNG_HAVE_PUB0 rv = nng_pub0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_PAIR0: #ifdef NNG_HAVE_PAIR0 rv = nng_pair0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_PAIR1: #ifdef NNG_HAVE_PAIR1 rv = nng_pair1_open(&sock); #else fatal("Protocol not supported"); #endif break; case OPT_BUS0: #ifdef NNG_HAVE_BUS0 rv = nng_bus0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_PUSH0: #ifdef NNG_HAVE_PUSH0 rv = nng_push0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_PULL0: #ifdef NNG_HAVE_PULL0 rv = nng_pull0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_SURVEY0: #ifdef NNG_HAVE_SURVEYOR0 rv = nng_surveyor0_open(&sock); #else fatal("Protocol not supported."); #endif break; case OPT_RESPOND0: #ifdef NNG_HAVE_RESPONDENT0 rv = nng_respondent0_open(&sock); #else fatal("Protocol not supported"); #endif break; case 0: default: fatal("No protocol specified."); break; } if (rv != 0) { fatal("Unable to open socket: %s", nng_strerror(rv)); } for (struct topic *t = topics; t != NULL; t = t->next) { rv = nng_setopt( sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val)); if (rv != 0) { fatal("Unable to subscribe to topic %s: %s", t->val, nng_strerror(rv)); } } if ((sendtimeo > 0) && ((rv = nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) != 0)) { fatal("Unable to set send timeout: %s", nng_strerror(rv)); } if ((recvtimeo > 0) && ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) != 0)) { fatal("Unable to set send timeout: %s", nng_strerror(rv)); } if ((recvmaxsz >= 0) && ((rv = nng_setopt_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) != 0)) { fatal("Unable to set max receive size: %s", nng_strerror(rv)); } for (struct addr *a = addrs; a != NULL; a = a->next) { char * act; nng_listener l; nng_dialer d; nng_tls_config *tls; switch (a->mode) { case OPT_DIAL: case OPT_DIAL_IPC: case OPT_DIAL_LOCAL: rv = nng_dialer_create(&d, sock, a->val); if (rv != 0) { fatal("Unable to create dialer for %s: %s", a->val, nng_strerror(rv)); } rv = nng_dialer_getopt_ptr( d, NNG_OPT_TLS_CONFIG, (void **) &tls); if (rv == 0) { configtls(tls); } else if (rv != NNG_ENOTSUP) { fatal("Unable to get TLS config: %s", nng_strerror(rv)); } if (zthome != NULL) { rv = nng_dialer_setopt(d, NNG_OPT_ZT_HOME, zthome, strlen(zthome) + 1); if ((rv != 0) && (rv != NNG_ENOTSUP)) { fatal("Unable to set ZT home: %s", nng_strerror(rv)); } } rv = nng_dialer_start(d, async); act = "dial"; if ((rv == 0) && (verbose == OPT_VERBOSE)) { char ustr[256]; size_t sz; sz = sizeof(ustr); if (nng_dialer_getopt( d, NNG_OPT_URL, ustr, &sz) == 0) { printf("Connected to: %s\n", ustr); } } break; case OPT_LISTEN: case OPT_LISTEN_IPC: case OPT_LISTEN_LOCAL: rv = nng_listener_create(&l, sock, a->val); if (rv != 0) { fatal("Unable to create listener for %s: %s", a->val, nng_strerror(rv)); } rv = nng_listener_getopt_ptr( l, NNG_OPT_TLS_CONFIG, (void **) &tls); if (rv == 0) { configtls(tls); } else if (rv != NNG_ENOTSUP) { fatal("Unable to get TLS config: %s", nng_strerror(rv)); } if (zthome != NULL) { rv = nng_listener_setopt(l, NNG_OPT_ZT_HOME, zthome, strlen(zthome) + 1); if ((rv != 0) && (rv != NNG_ENOTSUP)) { fatal("Unable to set ZT home: %s", nng_strerror(rv)); } } rv = nng_listener_start(l, async); act = "listen"; if ((rv == 0) && (verbose == OPT_VERBOSE)) { char ustr[256]; size_t sz; sz = sizeof(ustr); if (nng_listener_getopt( l, NNG_OPT_URL, ustr, &sz) == 0) { printf("Listening at: %s\n", ustr); } } break; default: fatal("Invalid address mode! (Bug!)"); } if (rv != 0) { fatal("Unable to %s on %s: %s", act, a->val, nng_strerror(rv)); } } switch (proto) { case OPT_SUB0: case OPT_PULL0: recvloop(sock); break; case OPT_REP0: case OPT_RESPOND0: if (data == NULL) { recvloop(sock); } else { resploop(sock); } break; case OPT_PUSH0: case OPT_PUB0: sendloop(sock); break; case OPT_BUS0: case OPT_PAIR0: case OPT_PAIR1: if (data != NULL) { sendrecv(sock); } else { recvloop(sock); } break; case OPT_REQ0: case OPT_SURVEY0: sendrecv(sock); break; default: fatal("Protocol handling unimplemented."); } exit(0); }