Skip to content
gabrieltholsgard edited this page Nov 5, 2013 · 8 revisions

Index

  1. How to subscribe
  2. How to publish
  3. How to use a web-socket
  4. Concepts
  5. Overview
  6. Program Structure
  7. Clients
  8. RabbitMQ

How to subscribe

Top
In the system each stream/virtual stream has it own exchange in RabbitMQ and clients subscribe to these exchanges by hooking up queues to them. To subscribe you need to connect to the exchange, this is an example how to do it in an Erlang module:

    %% Connect  
    {ok, Connection} =
            amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
    
    %% Open channel
    {ok, ChannelIn} = amqp_connection:open_channel(Connection),

    %% Declare queue
    amqp_channel:call(ChannelIn, #'exchange.declare'{exchange = ?EXCHANGE, type = ?TYPE}),
    #'queue.declare_ok'{queue = QueueIn} = amqp_channel:call(ChannelIn, #'queue.declare'{exclusive = true}),
    amqp_channel:call(ChannelIn, #'queue.bind'{exchange = ?EXCHANGE, queue = QueueIn}),

    %% Subscribe to INPUT queue
    amqp_channel:subscribe(ChannelIn, #'basic.consume'{queue = QueueIn, no_ack = true}, self()),
    receive
            {#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
                  // Do something with the data
    end.

For further details check the documentation here

How to subscribe via Node.js using rabbit.js:

    var sub = context.socket('SUB');
    sub.connect(?EXCHANGE);
    sub.on('data', function(data) {
            // Do something with the data
    });

For further details check the documentation here

How to publish

Top
When a data point is published it is published to an exchange and sent to the queues in a specified manner, this is an example of doing a fanout in an Erlang module:

    %% Connect
    {ok, Connection} =
            amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
    %% Open channel
    {ok, ChannelOut} = amqp_connection:open_channel(Connection),
    %% Declare exchange
    amqp_channel:call(ChannelOut, #'exchange.declare'{exchange = ?EXCHANGE, type = <<"fanout">>}),
    %% Publish data point
    amqp_channel:cast(Channel, #'basic.publish'{exchange = ?EXCHANGE}, #amqp_msg{payload = ?DATAPOINT}).

For further details check the documentation here
Check the documentation here for how to publish via Node.js

How to use a web-socket

Top
The system allows subscriptions via web-sockets, which is used to update web pages in real time. To let a webpage connect via a web-socket Socket.IO is used.
Here is an example of how to connect to a web-socket:

    var socket = io.connect(?IP_ADDRESS);
    socket.on(?MSG_TYPE, function(data) {
        // Do something with the data
    });

For further details look in the documentation here
Tip: If you want to connect several clients on different things through one web-socket, consider using namespaces, a functionality provided in Socket.IO.

##Concepts Top

  • Streams. A stream is a flow of data. A user can subscribe to a stream.
  • Virtual Streams. A virtual stream subscribes to one or several streams/virtual streams and process incoming data according to a defined function, which could be a prediction or some other aggregation of data. A user can subscribe to a virtual stream.
  • Prediction. Currently, the predictions are calculated using R.
  • Data points. A data point is a data value for a stream/virtual stream. When the system receives a data point from a external resources in Json format, the system parse it and transform it to a data point.

Overview

Top
When a new data point arrives for a stream, it is published in the pub/sub system and distributed to all clients who have subscribed to the stream. The system also supports dynamic visualization and prediction. RabbitMQ has been utilized to implement the back-end messaging. Node.js and Socket.IO are used to interact with the web pages via web-sockets.

##Program Structure Top

  • virtualStreamProcess.erl The implementation of the virtual stream, it may contain the prediction functionality and its local data store in the future. Also the virtual stream may do some data processing in the final product.
  • resourceProcessMock.erl Test replacement for the coming pollers and parsers, this is a process which generate some random data and push them into the pub/sub system so we could test our code.
  • streamProcess.erl Implementation of a stream process. Each stream process is associated with one stream and handles to push data both to the DB and to the pub/sub system.
  • datapoints_resource.erl handles a data point when provided via the API.

##Clients Top

  • Web pages Pub/Sub system could push data to webpages which enable dynamic visualization.
  • Sessions Inform a logged on user about their subscriptions.
  • Users Inform logged of users about their subscriptions.

##RabbitMQ Top
RabbitMQ is a message broker and provides robust messaging for many successful commercial websites, including Twitter. It runs on a majority of operating systems and is easy to use. RabbitMQ offers client`s libraries for many mainstream programming languages, including Erlang which we are using in this project. RabbitMQ is AMQP based protocol and the following are essential concepts:

  • Queues. A queue is used to cache incoming messages and a client could fetch messages from the queue.
  • Exchange. A exchange is where data points arrive and distribute to the connected queues according to some rule.
  • Publisher/Subscriber. A publisher is something that push data into the pub/sub system and a subscriber is something that listens to specific data that comes in to the pub/sub system.

If you would like to know more about RabbitMQ, please visit its official website.

Clone this wiki locally