var sys = require('util'); var stomp = require('stomp'); /* * Send 3 messages to a subscriber */ const MSG_COUNT = 3; var stomp_args = { port: 61613, host: process.argv[2] ? process.argv[2] : 'localhost', debug: false, login: "xtomp", passcode: "passcode", }; var destination = "memtop-a"; var testMessage = "TestingXX\n"; var errorHandler = function(error_frame) { console.log("error"); console.log("headers: " + sys.inspect(error_frame.headers)); console.log("body: " + error_frame.body); setTimeout(function() { publisher.disconnect(); subscriber1.disconnect(); subscriber2.disconnect(); process.exit(1); }, 2000); }; stomp_args.login = "sub1"; var subscriber = new stomp.Stomp(stomp_args); stomp_args.login = "publisher"; var publisher = new stomp.Stomp(stomp_args); // SUBSCRIBER - start subscriber.msg_count = 0; subscriber.on("connected", function() { subscriber.subscribe({ destination: destination, id : "1", ack: "client", receipt: "sub" }); }); subscriber.on("receipt", function(id) { if (id === "sub") { subscriber.subscribed = 1; } else if (id === "unsub") { subscriber.disconnect(); } else console.error("sub unexpected receipt " + id); }); subscriber.on("message", function(message) { if (testMessage !== "" + message.body) { console.error("wrong msg"); console.error("want:" + testMessage); console.error("got:" + message.body); errorHandler({}); } else { subscriber.ack(message.headers["message-id"]); subscriber.unsubscribe({ destination: destination, id : "1", receipt: "unsub" }); if (++subscriber.msg_count == MSG_COUNT) { // console.log("received " + MSG_COUNT + " messages"); subscriber.disconnect(); } } }); subscriber.on("error", errorHandler); // SUBSCRIBER1 - end // PUBLISHER - start var publisher = new stomp.Stomp(stomp_args); publisher.on("connected", function() { if ( subscriber.subscribed === 1 ) { for (let i = 0; i < MSG_COUNT; i++ ) { publisher.send({ "destination" : destination, "foo" : "1", "body" : testMessage, "receipt" : "send" }); } } else { setTimeout(function() { for (let i = 0; i < MSG_COUNT; i++ ) { publisher.send({ "destination" : destination, "foo" : "1", "body" : testMessage, "receipt" : "send" }); } }, 100); } }); publisher.on("receipt", function(id) { if (id === "send") { publisher.disconnect(); } }); publisher.on("error", errorHandler); // PUBLISHER - end process.on("SIGINT", function() { publisher.disconnect(); subscriber.disconnect(); }); subscriber.connect(); publisher.connect();