Category Archives: apollo

Why do I have max limit connections with Apache Apollo and Nginx by WebSockets?

guys! I have a problem. I have nginx config

map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}

limit_conn_zone $binary_remote_addr zone=addr:10m;

server {
#....

location /ws {
    limit_conn addr 3;
    proxy_pass http://localhost:61623;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header Host $host;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection $connection_upgrade;        


    proxy_connect_timeout 20;
    proxy_send_timeout 20;
    proxy_read_timeout 20;
    send_timeout 20;
}
}

And my apollo.xml config

 <connector id="ws"  bind="ws://0.0.0.0:61623"  connection_limit="2000">
     <detect timeout="10000"/>
 </connector>

Continue 500-600sec and I have max limit 2000 connections, but I have only 200 consumers. Why do I have this problems?

Apache Apollo MQTT not allowing more than 20K connections

We are trying to size Apache Apollo for max number of connections supported. We tried with various platforms and despite of their CPU/Core/RAM capacity, only 20K connections can be opened. After this, new connections are opened at a very slow rate (1 per 30 seconds or so).

We have done all the tuning specified in this thread: Max MQTT connections

We also increased Apollo JVM memory to 4G. Also tried running MQTT clients from different hosts.

When this problem occur, there is no symptoms in the server side. CPU usage, memory, everything is normal.

Note: The MQTT client we use is a simple JAR created using Paho library.

Are we missing anything?

Apache Apollo Connection Refused

I have installed Apache Apollo and running it successfully. I can access its web dashboard at http://127.0.0.1:61680/.

I want to connect to this via STOMP. I'm using the library [StompKit][1] to do this.

private let host: String = "127.0.0.1"
private let port: UInt = 61680

private var client: STOMPClient!

override func viewDidLoad() {
    super.viewDidLoad()

    client = STOMPClient(host: host, port: port)
    connect()
}

Connecting fails with the error Connection refused.

This is the apollo.xml file.

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements. See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version
  2.0 (the "License"); you may not use this file except in compliance
  with the License. You may obtain a copy of the License at
  http://www.apache.org/licenses/LICENSE-2.0 Unless required by
  applicable law or agreed to in writing, software distributed under
  the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
  OR CONDITIONS OF ANY KIND, either express or implied. See the
  License for the specific language governing permissions and
  limitations under the License.
-->

<!--
  For more information on how configure this file please
  reference:

  http://activemq.apache.org/apollo/versions/1.7.1/website/documentation/user-manual.html
  -->
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">

  <notes>
    The default configuration with tls/ssl enabled.
  </notes>

  <log_category console="console" security="security" connection="connection" audit="audit"/>


  <authentication domain="apollo"/>
  <!-- Give admins full access -->
  <access_rule allow="admins" action="*"/>
  <access_rule allow="*" action="connect" kind="connector"/>


  <virtual_host id="mybroker">
    <!--
      You should add all the host names that this virtual host is known as
      to properly support the STOMP 1.1 virtual host feature.
      -->
    <host_name>mybroker</host_name>
    <host_name>localhost</host_name>
    <host_name>127.0.0.1</host_name>

    <!-- Uncomment to disable security for the virtual host -->
    <!-- <authentication enabled="false"/> -->

    <!-- Uncomment to disable security for the virtual host -->
    <!-- <authentication enabled="false"/> -->
    <access_rule allow="users" action="connect create destroy send receive consume"/>


    <!-- You can delete this element if you want to disable persistence for this virtual host -->
    <leveldb_store directory="${apollo.base}/data"/>


  </virtual_host>

  <web_admin bind="http://127.0.0.1:61680"/>
  <web_admin bind="https://127.0.0.1:61681"/>

  <connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="2000"/>
  <connector id="tls" bind="tls://0.0.0.0:61614" connection_limit="2000"/>
  <connector id="ws"  bind="ws://0.0.0.0:61623"  connection_limit="2000"/>
  <connector id="wss" bind="wss://0.0.0.0:61624" connection_limit="2000"/>

  <key_storage file="${apollo.base}/etc/keystore" password="password" key_password="password"/>

</broker>

Attempt 1

Only answer I could find related to this is [this][2] (Although it's for MQTT). Anyway, I modified the apollo.xml file by adding the authentication line.

<virtual_host id="mybroker">
    <authentication enabled="false"/>
...

But that changed nothing.

Attempt 2:

The web dashboard has a username/password login. So I tried a different method of StompKit.

client.connectWithLogin("admin", passcode: "password") { connectedFrame, error in
    if let error = error {
        print("Error during connecting: \(error.localizedDescription)")
    } else {
        print("Connected!")
    }
}

Still the error persists.

What am I doing wrong here?

Error creating an Apache Apollo broker

I downloaded and unzipped the Apache Apollo distribution as described in their site.

~/Developer/Web/MQTT/apache-apollo-1.7.1/bin/apollo create mybroker

I got teh below output in the Terminal.

Creating apollo instance at: mybroker

ERROR: mybroker/etc/log4j.properties (No such file or directory)

That command is supposed to create the etc sub directory among others.

Any idea why this error is occurring?

Apache Apollo on Ubuntu 14.04 – Can not connect to Websocket (Thread issue?)

I deployed MyBroker service on Ubuntu 14.04 Virtual Machine and get the following logs: apollo.log:

    2016-03-07 13:52:28,301 | INFO  | java.io.IOException: Connection reset by peer | 1535101acf6
2016-03-07 13:52:28,301 | INFO  | java.io.IOException: Connection reset by peer | 1535101acf8
2016-03-07 13:52:28,302 | INFO  | java.io.IOException: Connection reset by peer | 1535101acfa
2016-03-07 13:52:28,302 | INFO  | java.io.IOException: Connection reset by peer | 1535101acfc
2016-03-07 13:52:28,303 | INFO  | java.io.IOException: Connection reset by peer | 1535101acfe
2016-03-07 13:52:28,303 | INFO  | java.io.IOException: Connection reset by peer | 1535101ad00
2016-03-07 13:52:28,303 | INFO  | java.io.IOException: Connection reset by peer | 1535101ad02
2016-03-07 13:52:37,923 | INFO  | java.io.IOException: Connection reset by peer | 1535101ad04
2016-03-07 13:53:07,938 | INFO  | java.io.IOException: Connection reset by peer | 1535101ad06
2016-03-07 13:53:27,963 | INFO  | java.io.IOException: Connection reset by peer | 1535101ad08

connection.log

2016-03-07 13:53:22,817 connected: local:/127.0.0.1:61613, remote:/127.0.0.1:53221
2016-03-07 13:53:22,825 connected: local:/127.0.0.1:61613, remote:/127.0.0.1:53222
2016-03-07 13:53:22,847 connected: local:/127.0.0.1:61613, remote:/127.0.0.1:53222
2016-03-07 13:53:22,852 connected: local:/127.0.0.1:61613, remote:/127.0.0.1:53221
2016-03-07 13:53:27,045 disconnected: local:/127.0.0.1:61613, remote:/127.0.0.1:53222
2016-03-07 13:53:27,963 Shutting connection '/127.0.0.1:53221'  down due to: java.io.IOException: Connection reset by peer
2016-03-07 13:53:27,964 disconnected: local:/127.0.0.1:61613, remote:/127.0.0.1:53221

stacktrace.log

2016-03-04 14:31:40,305 | WARN  | stackref=153416c3084
java.lang.ArrayIndexOutOfBoundsException: 32
    at org.fusesource.mqtt.codec.SUBSCRIBE.decode(SUBSCRIBE.java:65)
    at org.apache.activemq.apollo.mqtt.MqttSession.on_transport_command(MqttSession.java:334)
    at org.apache.activemq.apollo.mqtt.MqttSession$3.call(MqttSession.java:142)
    at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
    at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
    at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
    at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport.drain_inbound(WebSocketTransportFactory.scala:480)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport$$anonfun$onMessage$1.apply$mcV$sp(WebSocketTransportFactory.scala:403)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2016-03-04 14:31:44,370 | WARN  | stackref=153416c3088
java.lang.ArrayIndexOutOfBoundsException: 32
    at org.fusesource.mqtt.codec.SUBSCRIBE.decode(SUBSCRIBE.java:65)
    at org.apache.activemq.apollo.mqtt.MqttSession.on_transport_command(MqttSession.java:334)
    at org.apache.activemq.apollo.mqtt.MqttSession$3.call(MqttSession.java:142)
    at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
    at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
    at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
    at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport.drain_inbound(WebSocketTransportFactory.scala:480)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport$$anonfun$onMessage$1.apply$mcV$sp(WebSocketTransportFactory.scala:403)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2016-03-07 08:47:06,569 | WARN  | stackref=153416c320c
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 08:47:07,227 | WARN  | stackref=153416c320e
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:35:08,660 | WARN  | stackref=153416c3319
java.lang.NullPointerException
    at org.apache.activemq.apollo.broker.DeliveryProducerRoute$$anonfun$unbind$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$4.apply(Router.scala:258)
    at org.apache.activemq.apollo.broker.DeliveryProducerRoute$$anonfun$unbind$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$4.apply(Router.scala:256)
    at scala.collection.TraversableLike$$anonfun$filterNot$1.apply(TraversableLike.scala:274)
    at scala.collection.TraversableLike$$anonfun$filterNot$1.apply(TraversableLike.scala:274)
    at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
    at scala.collection.immutable.List.foreach(List.scala:309)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
    at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:274)
    at scala.collection.AbstractTraversable.filterNot(Traversable.scala:105)
    at org.apache.activemq.apollo.broker.DeliveryProducerRoute$$anonfun$unbind$1$$anonfun$apply$mcV$sp$4.apply(Router.scala:256)
    at org.apache.activemq.apollo.broker.DeliveryProducerRoute$$anonfun$unbind$1$$anonfun$apply$mcV$sp$4.apply(Router.scala:251)
    at scala.collection.TraversableLike$$anonfun$filterNot$1.apply(TraversableLike.scala:274)
    at scala.collection.TraversableLike$$anonfun$filterNot$1.apply(TraversableLike.scala:274)
    at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
    at scala.collection.immutable.List.foreach(List.scala:309)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
    at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:274)
    at scala.collection.AbstractTraversable.filterNot(Traversable.scala:105)
    at org.apache.activemq.apollo.broker.DeliveryProducerRoute$$anonfun$unbind$1.apply$mcV$sp(Router.scala:251)
    at org.apache.activemq.apollo.util.DeferringDispatched$$anon$1.run(Dispatched.scala:38)
    at org.apache.activemq.apollo.util.DeferringDispatched$$anonfun$dispatch_queue_task_source$1$$anonfun$apply$mcV$sp$1.apply(Dispatched.scala:45)
    at org.apache.activemq.apollo.util.DeferringDispatched$$anonfun$dispatch_queue_task_source$1$$anonfun$apply$mcV$sp$1.apply(Dispatched.scala:45)
    at scala.collection.immutable.List.foreach(List.scala:309)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at org.apache.activemq.apollo.util.DeferringDispatched$$anonfun$dispatch_queue_task_source$1.apply$mcV$sp(Dispatched.scala:45)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.HawtCustomDispatchSource$1.run(HawtCustomDispatchSource.java:127)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2016-03-07 09:37:13,687 | WARN  | stackref=153416c334d
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:37:14,493 | WARN  | stackref=153416c334f
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:37:15,274 | WARN  | stackref=153416c3351
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:37:16,555 | WARN  | stackref=153416c3353
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:38:49,275 | WARN  | stackref=153416c335f
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:38:50,234 | WARN  | stackref=153416c3361
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:38:50,990 | WARN  | stackref=153416c3363
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:38:51,752 | WARN  | stackref=153416c3365
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 09:38:52,515 | WARN  | stackref=153416c3367
java.lang.ArrayIndexOutOfBoundsException
2016-03-07 10:48:21,306 | WARN  | stackref=1535072b8fc
java.lang.ArrayIndexOutOfBoundsException: 80
    at org.fusesource.mqtt.codec.SUBSCRIBE.decode(SUBSCRIBE.java:65)
    at org.apache.activemq.apollo.mqtt.MqttSession.on_transport_command(MqttSession.java:334)
    at org.apache.activemq.apollo.mqtt.MqttSession$3.call(MqttSession.java:142)
    at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
    at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
    at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
    at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport.drain_inbound(WebSocketTransportFactory.scala:480)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport$$anonfun$onMessage$1.apply$mcV$sp(WebSocketTransportFactory.scala:403)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2016-03-07 10:51:57,043 | WARN  | stackref=1535072b916
java.lang.ArrayIndexOutOfBoundsException: 80
    at org.fusesource.mqtt.codec.SUBSCRIBE.decode(SUBSCRIBE.java:65)
    at org.apache.activemq.apollo.mqtt.MqttSession.on_transport_command(MqttSession.java:334)
    at org.apache.activemq.apollo.mqtt.MqttSession$3.call(MqttSession.java:142)
    at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
    at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
    at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
    at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport.drain_inbound(WebSocketTransportFactory.scala:480)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport$$anonfun$onMessage$1.apply$mcV$sp(WebSocketTransportFactory.scala:403)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2016-03-07 13:30:06,035 | WARN  | stackref=1535101abf7
java.lang.ArrayIndexOutOfBoundsException: 32
    at org.fusesource.mqtt.codec.SUBSCRIBE.decode(SUBSCRIBE.java:65)
    at org.apache.activemq.apollo.mqtt.MqttSession.on_transport_command(MqttSession.java:334)
    at org.apache.activemq.apollo.mqtt.MqttSession$3.call(MqttSession.java:142)
    at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
    at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
    at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
    at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport.drain_inbound(WebSocketTransportFactory.scala:480)
    at org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory$WebSocketTransport$$anonfun$onMessage$1.apply$mcV$sp(WebSocketTransportFactory.scala:403)
    at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:357)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

My assumption is that the virtual machine with Ubuntu 14.04 (and 1,5 GB RAM) can not handle all the services (Running also other servies like a Tomcat on the same machine) - Someone got an idea based on the posted logfiles? (I only see a NullPointException for the Threads in the stacktrace.log)

Conversion from MQTT to AMQP/STOMP

I'm kind of new to these protocols, and just started exploring Message brokers like Apache Apollo and RabbitMQ. So my broker receives MQTT messages from a publisher. And I would like to convert it into AMQP (preferably) or STOMP protocol to send to a web server. But I've so far been unable to do so. I looked into RabbitMQ, and tried enabling the MQTT plugin, but when I do load it, I'm unable to start the server. I was wondering if anyone can guide me here? Is there an API that can help me? And I'm very confused about RabbitMQ. I've been able to load other plugins easily,like stomp, management utilities etc.