var sys = require('util'); var stomp = require('stomp'); /* * Send same message to N subscribers. */ var stomp_args = { port: 61613, host: process.argv[2] ? process.argv[2] : 'localhost', debug: false, login: "subn", passcode: "passcode", }; var destination = "memtop-a"; var testMessage = "Testing99\n"; var n = 100; var acks = n; var subscribers = []; var subscribed = 0; var errorHandler = function(error_frame) { console.log("error"); console.log("headers: " + sys.inspect(error_frame.headers)); console.log("body: " + error_frame.body); setTimeout(function() { publisher.disconnect(); for (i = 0 ; i < n ; i++) { subscribers[i].disconnect(); } console.log("test timeout"); process.exit(1); }, 2000); }; stomp_args.login = "pubn"; var publisher = new stomp.Stomp(stomp_args); var cnt = n; var connect = function (i) { stomp_args.login = "subn"; var subscriber = new stomp.Stomp(stomp_args); subscribers.push(subscriber); subscriber.n = i; subscriber.on("connected", function() { subscriber.subscribe({ destination: destination, id : "" + subscriber.n, ack: "client", receipt: "sub" }); }); subscriber.on("receipt", function(id) { if (id === "sub") { subscribed++; //console.log("subscribed:" + subscribed); } else if (id === "unsub") { //console.log("unsub:" + cnt--); subscriber.disconnect(); } else console.error("sub unexpected receipt " + id); }); subscriber.on("message", function(message) { if (testMessage !== "" + message.body) { console.error("wrong msg"); console.error("want:" + testMessage); console.error("got:" + message.body); errorHandler({}); } else { //console.log("acking:" + message.headers["message-id"] + " acks:" + acks--) subscriber.ack(message.headers["message-id"]); subscriber.unsubscribe({ destination: destination, id : "" + subscriber.n, receipt: "unsub" }); } }); subscriber.on("error", errorHandler); subscriber.connect(); }; for (i = 0 ; i < n ; i++) { connect(i); } // PUBLISHER - start var publisher = new stomp.Stomp(stomp_args); publisher.on("connected", function() { if ( subscribed === n ) { publisher.send({ "destination" : destination, "foo" : "1", "body" : testMessage, "receipt" : "send" }); } else { setTimeout(function() { publisher.send({ "destination" : destination, "foo" : "1", "body" : testMessage, "receipt" : "send" }); }, 100 * n); } }); publisher.on("receipt", function(id) { if (id === "send") { publisher.disconnect(); } }); publisher.on("error", errorHandler); // PUBLISHER - end process.on("SIGINT", function() { publisher.disconnect(); for (i = 0 ; i < n ; i++) { subscribers[i].disconnect(); } }); setTimeout(function () { publisher.connect(); }, 25 * n);