Erik's Engineering

something alliterative

Tailing log output over the web via RabbitMQ

Why Queue?

Sometimes, you want to have two different parts of a program communicate with each other. You can open a socket and talk directly, or perhaps use a pipe if they're on the same system. However, both of those are a bit of a pain and require that both the speaker and the listener be involved at the same time. If one of them goes down, the communication doesn't happen.

That's where queuing software comes in. It lets you write to a queue from one process and then read from it some time later from another without having to make everything sync up. If something restarts, you don't lose your messages. It may also let you have multiple readers getting each of those writes, which can be pretty handy for some things.

What software?

I happen to like RabbitMQ for my queuing software. It's moderately lightweight, fairly easy to get running on Linux, and can harden messages and queues to disk.

Hardening is important if you want your messages to survive a queue server restart. That isn't important for every queue (it won't be for the example in this post), but I think it's best to use a queue server that supports it so you don't have to switch servers when you come across something that needs it.

Carrot is a client for RabbitMQ. Unlike other RabbitMQ clients, it's designed for synchronous use. It doesn't use EventMachine, so it's easy to use in conjunction with Passenger. I'll be using it for this example.

AMQP Basics

RabbitMQ is based on AMQP. AMQP is a protocol for communicating with queuing servers. It defines a model for how to manage queues that's a little more complicated than just saying "here's a queue, write to it".

In general, you write a message and routing key to an exchange. You read from a queue. You bind a queue to an exchange so that the queue will receive messages written to the exchange. Your binding may filter messages based on the routing key.

That may seem overcomplicated, but it lets you do some neat tricks. By binding multiple queues to a single exchange, you can have them all get whatever messages you write. Bindings with filters make it so certain messages show up in a particular queue. You might, for instance, have an exchange that receives all your log messages and individual queues that get messages at different log levels.

Queues are cheap, so you'd probably make a queue for each client that's reading those log messages. That way, even if Alice and Bob are both watching the logs, they'll both see all the messages.

If you want something like a work queue, where multiple workers process jobs, you'll want a single queue. That way each job will only be processed once.

An Example

For our example, we're going to set up a simple reader/writer pair to watch some system statistics on the server running our app. We'll complicate things a little bit by wanting to watch them in our browser.

First, we need a source of data. For that, we'll just use the normal vmstat program. Or, since I'm doing this on a Mac, vm_stat.

# script/vmstat_tailer

IO.popen ("vm_stat 1") do |f|
  while line = f.readline
    puts line
  end
end

This will run vmstat and forever spit the output to the screen. We want it to go to a queue, though. To do that, let's first create a QueueWriter class that acts as a thin wrapper around Carrot. This will give you a place to put connection information.

class QueueWriter
  attr_accessor :exchange

  def initialize(name, opts = {})
    @exchange = Carrot.topic(name, { :host => QUEUE_HOST,
                               :port => QUEUE_PORT,
                               :user => QUEUE_USER,
                               :pass => QUEUE_PASS,
                               :vhost => QUEUE_VHOST}.merge(opts))
  end

  def write(message)
    exchange.publish(message)
  end

  def delete
    exchange.delete
  end
end

You'll need to set those config options (QUEUE_HOST, etc) in environment.rb or an environment specific file (e.g. config/environments/development.rb). In general, those are the 5 pieces of data you need to specify in order to set up a connection to RabbitMQ. For now, just whack the following into environment.rb:

QUEUE_HOST = 'localhost'
QUEUE_PORT = 5672
QUEUE_USER = 'guest'
QUEUE_PASS = 'guest'
QUEUE_VHOST = '/'

Those are defaults that should get you connected without having to do any setup. They should be pretty self explanatory except for QUEUE_VHOST. Everything in RabbitMQ is divided among vhosts. They're usually named sort of like directories in a filesystem. Permissions are set on a per-vhost basis, so you probably want to create a separate vhost for each application.

You can check that everything is working so far like so:

slim:carrot_demo edebill$ script/console
Loading development environment (Rails 2.3.5)
>> x = QueueWriter.new("hello")
=> #<QueueWriter:0x23cffd8 ... lots of stuff ... Carrot::AMQP::Exchange:0x27ce4e0 ...>}>>>
>> 

and then from a command line, do

vagrant@vagrant-karmic:~$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.rabbitmq.log	topic
amq.match	headers
hello	topic
amq.headers	headers
amq.topic	topic
amq.direct	direct
amq.fanout	fanout
	direct
...done.
vagrant@vagrant-karmic:~$ 

You can see that "hello" topic exchange nestled in amongst all the others. rabbitmqctl is your friend. Try running it as root with no options and read through the output. Commands like list_exchanges and list_queues let you check up on what's going on in your RabbitMQ server (don't forget to passit a vhost parameter). It's also how you set up users and passwords (add_user, then set_permissions).

Now, you need a reader. Something like the following is a good start:

class QueueReader
  attr_accessor :queue

  def initialize(exchangename, queuename, opts = {})
    opts = { :host =&gt; QUEUE_HOST,
           :port =&gt; QUEUE_PORT,
           :user =&gt; QUEUE_USER,
           :pass =&gt; QUEUE_PASS,
           :vhost =&gt; QUEUE_VHOST}.merge(opts)

    @queue = Carrot.queue(queuename, opts)
    @queue.bind(exchangename, opts)
  end

  def pop
    @queue.pop
  end

  def count
    @queue.message_count
  end
end

OK... so we've got a reader and a writer. Let's use them. Here's script/vmstat_tailer

# vmstat_tailer  run via script/runner

w = QueueWriter.new("vmstat")

IO.popen ("vm_stat 1") do |f|
  while line = f.readline
    w.write(line)
  end
end

w.delete

Not a lot to it. It'll get an output line about once a second and stick it onto the 'vmstat' exchange on our RabbitMQ server. When the process exits, it will delete the exchange.

In order to see this, let's create a controller and hook it up to the web.

class VmstatController &lt; ApplicationController
  def index
    q = QueueReader.new("vmstat", session[:session_id])
    
    lines = []
    while line = q.pop
      lines &lt;&lt; line
    end

    render :json =&gt; lines
  end
end

The first time this is called, it will create a queue and bind it to the vmstat exchange. It then checks for anything on that queue (which will be empty the first time) and outputs it as JSON. The next time it's hit, it'll try to create the queue again - except the queue already exists so it ends up just reconnecting. This time, it will probably find something to output.

Every queue needs a name, so we just use our sessionid. That's guaranteed to be unique for all our viewers.

It's pretty easy to hit this service with jQuery's getJSON and use it to add content to a page. Voila. Close to real-time log tailing over the web, that won't pound your database or re-process the output repeatedly.

Gotcha

Can you spot the resource leaks?

The above implementation is going to keep creating more and more exchanges and queues. We try to clean up our exchanges, but we don't have any guarantees about that. We don't even try to clean up our queues.

So, what can we do?

RabbitMQ doesn't give you an API for querying what queues and exchanges are on the server. You can do that with rabbitmqctl, but you'll need to run it as root. This means you've got a few options:

  1. Ignore it. Restart RabbitMQ occasionally and all these entities (which were not flagged as persistent) will disappear. Planned downtime. That would be OK for something low volume, but isn't a great idea for a robust production system.
  2. Write a cron job to run rabbitmqctl, parse the output, and clean up. This sucks because your cron job has to run as root.
  3. Save the names of your exchanges and queues when you create them. Periodically go back and clean them up. The trick here is figuring out when they're no longer in use. Maybe an #ensure block on whatever writes to the exchange, or some handy bit of domain knowledge.
  4. Install a plugin for RabbitMQ that will expose the names of your entities via a web interface. I haven't seen one purpose built for this, but there is a stats plugin that seemed like it might do the job.
  5. Post a question on StackOverflow and see if anyone else has an answer.

Published on 15/05/2010 at 13h44 under , . Tags , , , ,

Comment Tailing log output over the web via RabbitMQ

Trackbacks are disabled

Powered by Typo – Thème Frédéric de Villamil | Photo Glenn