Need help with aiokafka?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

667 Stars 146 Forks Apache License 2.0 1.2K Commits 82 Opened issues


asyncio client for kafka

Services available


Need anything else?

Contributors list


.. image:: :target: :alt: |Build status| .. image:: :target: :alt: |Coverage| .. image:: :target: :alt: |Chat on Gitter|

asyncio client for Kafka


AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

.. code-block:: python

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait("my_topic", b"Super message") finally: # Wait for all pending messages to be delivered or expire. await producer.stop()


AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >=

Example of AIOKafkaConsumer usage:

.. code-block:: python

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group my-group await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop()

Running tests

Docker is required to run tests. See for installation notes. Also note, that

compression libraries for python will require
package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It's required for the
utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::

sudo apt-get install -y libsnappy-dev
make setup

Running tests with coverage::

make cov

To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable::


Test running cheatsheat:

  • make test FLAGS="-l -x --ff"
    - run until 1 failure, rerun failed tests fitst. Great for cleaning up a lot of errors, say after a big refactor.
  • make test FLAGS="-k consumer"
    - run only the consumer tests.
  • make test FLAGS="-m 'not ssl'"
    - run tests excluding ssl.
  • make test FLAGS="--no-pull"
    - do not try to pull new docker image before test run.

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.