import json import typer import random import time from walrus import Database from enum import Enum BLOCK_TIME = 5000 STREAM_KEY = "GAME_RECORD_STREAM" app = typer.Typer() class StartFrom(str, Enum): beginning = "0" latest = "$" @app.command() def start(group_id, consumer_id: str, start_from: StartFrom = StartFrom.latest): rdb = Database(db=8) cg = rdb.consumer_group(group_id, [STREAM_KEY], consumer=consumer_id) cg.create() # Create the consumer group. Default starts from the latest if start_from == StartFrom.beginning: cg.set_id(start_from) while True: print("Reading stream...") streams = cg.read(1, block=BLOCK_TIME) for stream_id, messages in streams: for message_id, message in messages: try: print(f"processing {stream_id}::{message_id}::{message}") payload = json.loads(message[b"data"]) cg.game_record_stream.ack(message_id) except: print(f"Error occured in processing {message_id}") pass if __name__ == "__main__": app()