A kinesis consumer is purely written in python. This is a lightweight wrapper on top of AWS python library boto3. You also can consume records from Kinesis Data Stream (KDS) via:
- Lambda function: I have a demo kinesis-lambda-sqs-demo showing how to consume records in a serverless and real-time way.
- Kinesis Firehose: This is a AWS managed service and easily save records into different sinks, like S3, ElasticSearch, Redshift.
Install the package via pip
:
pip install kcpy
from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name')
for record in consumer:
print(record)
The output would look like:
{
'ApproximateArrivalTimestamp': datetime.datetime(2018, 11, 13, 11, 57, 55, 117807),
'Data': b'Jessica Walter',
'PartitionKey': 'Jessica Walter',
'SequenceNumber': '1'
}
Or, you can consume stream data with checkpointing:
from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name', consumer_name='my_consumer', checkpoint=True)
for record in consumer:
print(record)
Below shows the schema of checkpointing:
producer
[stream_1] |
+---------------+---+---+---+---+---+---+---+---+ |
| shard_1 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |...| <-------------------+
+---------------+---+---+---+---+---+---+---+---+ |
| shard_2 | 1 | 2 | 3 | 4 | 5 |...| <---------------------------+
+---------------+---+---+---+---+---+---+---+---+---+ |
| shard_3 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |...| <---------------+
+---------------+---+---+---+---+---+---+---+---+---+
^ ^
| |
consumer_1 consumer_2
| |
| +---------+
| |
+------------------+ |
| |
v |
+---------------+-------------+----------+--------+ |
| consumer_name | stream_name | shard_id | seq_no | |
+---------------+-------------+----------+--------+ |
| consumer_1 | stream_1 | shard_1 | 5 | |
| consumer_1 | stream_1 | shard_2 | 15 | |
| consumer_1 | stream_1 | ... | 15 | |
| consumer_1 | stream_1 | shard_N | XX | |
| consumer_2 | stream_1 | shard_1 | 6 | <---+
+---------------+-------------+----------+--------+
- Read records from a stream with multiple shards
- Save checkpoint for each shard consumer for a stream
Add type checking with mypyAdd tox for automating multiple testing environmentsAdd the config for travis CI- Support other storage solutions (mysql, dynamodb, redis, etc.) for checkpointing
- Rebalance when the number of shards changes
- Allow kcpy to run on multiple machines
- Add travis CI config and remove python3.5.
- Fix some issues in setup.py.
- Add consumer checkpointing with a simple sqlite storage solution.
- Pass aws configurations into boto3 client directly.
- Update the README.
- Add markdown support for long description.
- Add a long description.
- First version of kcpy.
Copyright (c) 2018 Hengfeng Li. It is free software, and may be redistributed under the terms specified in the LICENSE file.