Skip to content Skip to sidebar Skip to footer

How Could A Distributed Queue-like-thing Be Implemented On Top Of A Rbdms Or Nosql Datastore Or Other Messaging System (e.g., Rabbitmq)?

From the wouldn't-it-be-cool-if category of questions ... By 'queue-like-thing' I mean supports the following operations: append(entry:Entry) - add entry to tail of queue take()

Solution 1:

Redis supports lists and ordered sets: http://redis.io/topics/data-types#lists

It also supports transactions and publish/subscribe messaging. So, yes, I would say this can be easily done on redis.

Update: In fact, about 80% of it has been done many times: http://www.google.co.uk/search?q=python+redis+queue

Several of those hits could be upgraded to add what you want. You would have to use transactions to implement the promote/demote operations.

It might be possible to use lua on the server side to create that functionality, rather than having it in client code. Alternatively, you could create a thin wrapper around redis on the server, that implements just the operations you want.

Solution 2:

Python: "Batteries Included"

Rather than looking to a data store like RabbitMQ, Redis, or an RDBMS, I think python and a couple libraries have more than enough to solve this problem. Some may complain that this do-it-yourself approach is re-inventing the wheel but I prefer running a hundred lines of python code over managing another data store.

Implementing a Priority Queue

The operations that you define: append, take, promote, and demote, describe a priority queue. Unfortunately python doesn't have a built-in priority queue data type. But it does have a heap library called heapq and priority queues are often implemented as heaps. Here's my implementation of a priority queue meeting your requirements:

classPQueue:
    """
    Implements a priority queue with append, take, promote, and demote
    operations.
    """def__init__(self):
        """
        Initialize empty priority queue.
        self.toll is max(priority) and max(rowid) in the queue
        self.heap is the heap maintained for take command
        self.rows is a mapping from rowid to items
        self.pris is a mapping from priority to items
        """
        self.toll = 0
        self.heap = list()
        self.rows = dict()
        self.pris = dict()

    defappend(self, value):
        """
        Append value to our priority queue.
        The new value is added with lowest priority as an item. Items are
        threeple lists consisting of [priority, rowid, value]. The rowid
        is used by the promote/demote commands.
        Returns the new rowid corresponding to the new item.
        """
        self.toll += 1
        item = [self.toll, self.toll, value]
        self.heap.append(item)
        self.rows[self.toll] = item
        self.pris[self.toll] = item
        return self.toll

    deftake(self):
        """
        Take the highest priority item out of the queue.
        Returns the value of the item.
        """
        item = heapq.heappop(self.heap)
        del self.pris[item[0]]
        del self.rows[item[1]]
        return item[2]

    defpromote(self, rowid):
        """
        Promote an item in the queue.
        The promoted item swaps position with the next highest item.
        Returns the number of affected rows.
        """if rowid notin self.rows: return0
        item = self.rows[rowid]
        item_pri, item_row, item_val = item
        next = item_pri - 1ifnextin self.pris:
            iota = self.pris[next]
            iota_pri, iota_row, iota_val = iota
            iota[1], iota[2] = item_row, item_val
            item[1], item[2] = iota_row, iota_val
            self.rows[item_row] = iota
            self.rows[iota_row] = item
            return2return0

The demote command is nearly identical to the promote command so I'll omit it for brevity. Note that this depends only on python's lists, dicts, and heapq library.

Serving our Priority Queue

Now with the PQueue data type, we'd like to allow distributed interactions with an instance. A great library for this is gevent. Though gevent is relatively new and still beta, it's wonderfully fast and well tested. With gevent, we can setup a socket server listening on localhost:4040 pretty easily. Here's my server code:

pqueue = PQueue()

defpqueue_server(sock, addr):
    text = sock.recv(1024)
    cmds = text.split(' ')
    if cmds[0] == 'append':
        result = pqueue.append(cmds[1])
    elif cmds[0] == 'take':
        result = pqueue.take()
    elif cmds[0] == 'promote':
        result = pqueue.promote(int(cmds[1]))
    elif cmds[0] == 'demote':
        result = pqueue.demote(int(cmds[1]))
    else:
        result = ''
    sock.sendall(str(result))
    print'Request:', text, '; Response:', str(result)

if args.listen:
    server = StreamServer(('127.0.0.1', 4040), pqueue_server)
    print'Starting pqueue server on port 4040...'
    server.serve_forever()

Before that runs in production, you'll of course want to do some better error/buffer handling. But it'll work just fine for rapid-prototyping. Notice that this doesn't require any locking around the pqueue object. Gevent doesn't actually run code in parallel, it just gives that impression. The drawback is that more cores won't help but the benefit is lock-free code.

Don't get me wrong, the gevent SocketServer will process multiple requests at the same time. But it switches between answering requests through cooperative multitasking. This means you have to yield the coroutine's time slice. While gevents socket I/O functions are designed to yield, our pqueue implementation is not. Fortunately, the pqueue completes it's tasks really quickly.

A Client Too

While prototyping, I found it useful to have a client as well. It took some googling to write a client so I'll share that code too:

if args.client:
    while True:
        msg = raw_input('> ')
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(msg)
        text = sock.recv(1024)
        sock.close()
        print text

To use the new data store, first start the server and then start the client. At the client prompt you ought to be able to do:

> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two

Scaling Extremely Well

Given your thinking about a data store, it seems you're really concerned with throughput and durability. But "scale extremely well" doesn't quantify your needs. So I decided to benchmark the above with a test function. Here's the test function:

def test():
    import time
    import urllib2
    import subprocess

    import random
    random = random.Random(0)

    from progressbar import ProgressBar, Percentage, Bar, ETA
    widgets = [Percentage(), Bar(), ETA()]

    def make_name():
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        return ''.join(random.choice(alphabet)
                       for rpt in xrange(random.randrange(3, 20)))

    def make_request(cmds):
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(cmds)
        text = sock.recv(1024)
        sock.close()

    print 'Starting server and waiting 3 seconds.'
    subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
                    shell=True)
    time.sleep(3)

    tests = []
    def wrap_test(name, limit=10000):
        def wrap(func):
            def wrapped():
                progress = ProgressBar(widgets=widgets)
                for rpt in progress(xrange(limit)):
                    func()
                secs = progress.seconds_elapsed
                print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
                    name, limit, secs, limit / secs)
            tests.append(wrapped)
            return wrapped
        return wrap

    def direct_append():
        name = make_name()
        pqueue.append(name)

    count = 1000000
    @wrap_test('Loaded', count)
    def direct_append_test(): direct_append()

    def append():
        name = make_name()
        make_request('append ' + name)

    @wrap_test('Appended')
    def append_test(): append()

    ...

    print 'Running speed tests.'
    for tst in tests: tst()

Benchmark Results

I ran 6 tests against the server running on my laptop. I think the results scale extremely well. Here's the output:

Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s

Final Frontier: Durability

Finally, durability is the only problem I didn't completely prototype. But I don't think it's that hard either. In our priority queue, the heap (list) of items has all the information we need to persist the data type to disk. Since, with gevent, we can also spawn functions in a multi-processing way, I imagined using a function like this:

defsave_heap(heap, toll):
    name = 'heap-{0}.txt'.format(toll)
    withopen(name, 'w') as temp:
        for val in heap:
            temp.write(str(val))
            gevent.sleep(0)

and adding a save function to our priority queue:

defsave(self):
    heap_copy = tuple(self.heap)
    toll = self.toll
    gevent.spawn(save_heap, heap_copy, toll)

You could now copy the Redis model of forking and writing the data store to disk every few minutes. If you need even greater durability then couple the above with a system that logs commands to disk. Together, those are the AFP and RDB persistence methods that Redis uses.

Solution 3:

Websphere MQ can do almost all of this.

The promote/demote is almost possible, by removing the message from the queue and putting it back on with a higher/lower priority, or, by using the "CORRELID" as a sequence number.

Solution 4:

What's wrong with RabbitMQ? It sounds exactly like what you need.

We extensively use Redis as well in our Production environment, but it doesn't have some of the functionality Queues usually have, like setting a task as complete, or re-sending the task if it isn't completed in some TTL. It does, on the other hand, have other features a Queue doesn't have, like it is a generic storage, and it is REALLY fast.

Solution 5:

Use Redisson it implements familiar List, Queue, BlockingQueue, Deque java interfaces in distributed approach provided by Redis. Example with a Deque:

Redisson redisson = Redisson.create();

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(newSomeObject());
queue.addLast(newSomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

redisson.shutdown();

Other samples:

https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#77-listhttps://github.com/mrniko/redisson/wiki/7.-distributed-collections/#78-queuehttps://github.com/mrniko/redisson/wiki/7.-distributed-collections/#710-blocking-queue

Post a Comment for "How Could A Distributed Queue-like-thing Be Implemented On Top Of A Rbdms Or Nosql Datastore Or Other Messaging System (e.g., Rabbitmq)?"