Metadata-Version: 2.4
Name: amqpipe
Version: 1.2
Summary: Twisted based pipeline framework for AMQP
Home-page: https://github.com/Fatal1ty/amqpipe
Author: Alexander Tikhonov
Author-email: random.gauss@gmail.com
License: MIT
Platform: all
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Console
Classifier: Framework :: Twisted
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: POSIX
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System
Classifier: Topic :: System :: Software Distribution
Requires-Dist: twisted==17.9.0
Requires-Dist: pika==0.11.0
Requires-Dist: colorlog==3.1.0
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: license
Dynamic: platform
Dynamic: requires-dist
Dynamic: summary

AMQPipe
=======

.. image:: https://travis-ci.org/Fatal1ty/amqpipe.svg?branch=master
    :target: https://travis-ci.org/Fatal1ty/amqpipe

.. image:: https://requires.io/github/Fatal1ty/amqpipe/requirements.svg?branch=master
    :target: https://requires.io/github/Fatal1ty/amqpipe/requirements/?branch=master
    :alt: Requirements Status

.. image:: https://img.shields.io/pypi/v/amqpipe.svg
    :target: https://pypi.python.org/pypi/amqpipe

.. image:: https://img.shields.io/pypi/pyversions/amqpipe.svg
    :target: https://pypi.python.org/pypi/amqpipe/

.. image:: https://img.shields.io/badge/license-MIT-blue.svg
    :target: https://raw.githubusercontent.com/Fatal1ty/amqpipe/master/LICENSE

Twisted based pipeline framework for AMQP. It allow you to create fast
asynchronous services which follow ideology:

-  get message from queue
-  doing something with message
-  publish some result

Installation
------------

Install via pip:

::

        pip install amqpipe

Basic usage
-----------

The minimal module based on AMQPipe is:

.. code:: python

    from amqpipe import AMQPipe

    pipe = AMQPipe()
    pipe.run()

It will simply get all messages from one RabbitMQ queue and publish them
to other RabbitMQ exchange.

Now we define some action on messages:

.. code:: python

    import hashlib
    from amqpipe import AMQPipe

    def action(message):
        return hashlib.md5(message).hexdigest()

    pipe = AMQPipe(action=action)
    pipe.run()

It will publish md5 checksum for every message as result.

If messages in input queue are in predefined format then you can define
converter-function:

.. code:: python

    import hashlib
    from amqpipe import AMQPipe

    def converter(message):
        return message['text']

    def action(text):
        return hashlib.md5(text).hexdigest()

    pipe = AMQPipe(converter=converter, action=action)
    pipe.run()

You can define service-specific arguments:

.. code:: python

    import hashlib
    from amqpipe import AMQPipe

    class Processor:
        def set_field(self, field):
            self.field = field

    processor = Processor()

    def init(args):
        processor.set_field(args.field)

    def converter(message):
        return message.get(processor.field)

    def action(text):
        return hashlib.md5(text).hexdigest()

    pipe = AMQPipe(converter, action, init)
    pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
    pipe.run()

You can connect to database in ``init`` function or do some other things
for initialization.

If your action returns Deferred then result would be published to
RabbitMQ when this Deferred will be resolved:

.. code:: python

    import logging
    from twisted.internet import defer
    from amqpipe import AMQPipe

    logger = logging.getLogger(__name__)

    class Processor:
        def set_field(self, field):
            self.field = field

    processor = Processor()

    def init(args):
        connect_to_db()
        ...

    def converter(message):
        return message.get(processor.field)

    @defer.inlineCallbacks
    def action(text):
        result = yield db_query(text)
        logger.info('Get from db: %s', result)
        defer.returnValue(result)

    pipe = AMQPipe(converter, action, init)
    pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
    pipe.run()

Init function may return Deferred too.
