From 51cfdf73d4112809aa7a01502300e491a305f44f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D8=A7=D9=85=DB=8C=D8=B1=D8=AD=D8=B3=DB=8C=D9=86=20=D9=85?= =?UTF-8?q?=D9=82=DB=8C=D8=B3=D9=87?= Date: Wed, 19 Apr 2023 02:00:08 +0430 Subject: [PATCH] Initial Commit --- consumergroup.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 +++ 2 files changed, 47 insertions(+) create mode 100644 consumergroup.py create mode 100644 requirements.txt diff --git a/consumergroup.py b/consumergroup.py new file mode 100644 index 0000000..a09d9b6 --- /dev/null +++ b/consumergroup.py @@ -0,0 +1,44 @@ +import json +import typer +import random +import time +from walrus import Database +from enum import Enum + +BLOCK_TIME = 5000 +STREAM_KEY = "GAME_RECORD_STREAM" + +app = typer.Typer() + + +class StartFrom(str, Enum): + beginning = "0" + latest = "$" + + +@app.command() +def start(group_id, consumer_id: str, start_from: StartFrom = StartFrom.latest): + rdb = Database(db=8) + cg = rdb.consumer_group(group_id, [STREAM_KEY], consumer=consumer_id) + cg.create() # Create the consumer group. Default starts from the latest + if start_from == StartFrom.beginning: + cg.set_id(start_from) + + while True: + print("Reading stream...") + streams = cg.read(1, block=BLOCK_TIME) + for stream_id, messages in streams: + for message_id, message in messages: + try: + print(f"processing {stream_id}::{message_id}::{message}") + payload = json.loads(message[b"data"]) + cg.game_record_stream.ack(message_id) + + + except: + print(f"Error occured in processing {message_id}") + pass + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..63fe952 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +redis==4.0.2 +walrus==0.8.2 +typer==0.4.0 \ No newline at end of file