commit 51cfdf73d4112809aa7a01502300e491a305f44f Author: امیرحسین مقیسه Date: Wed Apr 19 02:00:08 2023 +0430 Initial Commit diff --git a/consumergroup.py b/consumergroup.py new file mode 100644 index 0000000..a09d9b6 --- /dev/null +++ b/consumergroup.py @@ -0,0 +1,44 @@ +import json +import typer +import random +import time +from walrus import Database +from enum import Enum + +BLOCK_TIME = 5000 +STREAM_KEY = "GAME_RECORD_STREAM" + +app = typer.Typer() + + +class StartFrom(str, Enum): + beginning = "0" + latest = "$" + + +@app.command() +def start(group_id, consumer_id: str, start_from: StartFrom = StartFrom.latest): + rdb = Database(db=8) + cg = rdb.consumer_group(group_id, [STREAM_KEY], consumer=consumer_id) + cg.create() # Create the consumer group. Default starts from the latest + if start_from == StartFrom.beginning: + cg.set_id(start_from) + + while True: + print("Reading stream...") + streams = cg.read(1, block=BLOCK_TIME) + for stream_id, messages in streams: + for message_id, message in messages: + try: + print(f"processing {stream_id}::{message_id}::{message}") + payload = json.loads(message[b"data"]) + cg.game_record_stream.ack(message_id) + + + except: + print(f"Error occured in processing {message_id}") + pass + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..63fe952 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +redis==4.0.2 +walrus==0.8.2 +typer==0.4.0 \ No newline at end of file