-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
163 lines (145 loc) · 6.16 KB
/
main.py
File metadata and controls
163 lines (145 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import asyncio
import time
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from aiokafka import AIOKafkaConsumer
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
app = FastAPI(title="Real-Time Kafka Consumer")
# Mount the public folder at /static to serve CSS, images, etc.
app.mount("/static", StaticFiles(directory="public"), name="static")
# Get the Kafka bootstrap servers without a default;
# if not provided, we will disable the HTML page and consumer.
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS")
KAFKA_AVAILABLE = bool(KAFKA_BOOTSTRAP_SERVERS)
if not KAFKA_AVAILABLE:
print("WARNING: KAFKA_BOOTSTRAP_SERVERS env var is not set. Disabling HTML and consumer startup.")
# Define the default topic and assign it as the current topic.
DEFAULT_TOPIC = "default-topic"
current_topic = DEFAULT_TOPIC
consumer_task = None
# List to hold active WebSocket connections
active_connections: List[WebSocket] = []
# Function to run the Kafka consumer on a given topic.
async def consume_kafka_messages(topic: str):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=f"rewind-{int(time.time())}",
auto_offset_reset="earliest"
)
await consumer.start()
print(f"Kafka consumer started, subscribed to topic='{topic}'")
try:
async for msg in consumer:
message_str = msg.value.decode("utf-8")
disconnected = []
for conn in active_connections:
try:
await conn.send_text(message_str)
except Exception as e:
print(f"Failed to send message: {e}")
disconnected.append(conn)
for d in disconnected:
if d in active_connections:
active_connections.remove(d)
finally:
await consumer.stop()
print("Kafka consumer stopped.")
# Data model for the topic change request.
class TopicChange(BaseModel):
topic: str
# Endpoint to change the Kafka topic.
@app.post("/change_topic")
async def change_topic(topic_change: TopicChange):
global current_topic, consumer_task
new_topic = topic_change.topic.strip()
if not new_topic:
raise HTTPException(status_code=400, detail="Topic cannot be empty")
if new_topic == current_topic:
return {"message": "Topic unchanged"}
# Cancel the current consumer task if it exists.
if consumer_task:
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
print("Previous consumer task cancelled")
current_topic = new_topic
if KAFKA_AVAILABLE:
consumer_task = asyncio.create_task(consume_kafka_messages(current_topic))
return {"message": f"Topic changed to {new_topic}"}
# Endpoint to list all available topics, ensure the default topic exists,
# and filter out internal topics (like __consumer_offsets).
@app.get("/topics")
async def get_topics():
if not KAFKA_AVAILABLE:
raise HTTPException(status_code=503, detail="Kafka not configured.")
admin_client = AIOKafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await admin_client.start()
try:
topics = await admin_client.list_topics()
topics_set = set(topics)
# Create the default topic if it doesn't already exist.
if DEFAULT_TOPIC not in topics_set:
new_topic = NewTopic(DEFAULT_TOPIC, num_partitions=1, replication_factor=1)
try:
await admin_client.create_topics([new_topic])
topics_set.add(DEFAULT_TOPIC)
print(f"Default topic '{DEFAULT_TOPIC}' created.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create default topic: {e}")
# Filter out internal topics (those starting with '__').
filtered_topics = [t for t in topics_set if not t.startswith("__")]
# Ensure the default topic is the first in the list.
if DEFAULT_TOPIC in filtered_topics:
filtered_topics.remove(DEFAULT_TOPIC)
filtered_topics.insert(0, DEFAULT_TOPIC)
return {"topics": filtered_topics}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get topics: {e}")
finally:
await admin_client.close()
# Serve index.html at the root.
@app.get("/")
async def read_index():
if not KAFKA_AVAILABLE:
return HTMLResponse(
"<h1>Error: KAFKA_BOOTSTRAP_SERVERS env var is not set.</h1>"
"<p>Please configure this environment variable to enable the application.</p>",
status_code=503
)
from pathlib import Path
index_file = Path("public") / "index.html"
if not index_file.exists():
return HTMLResponse("<h1>index.html not found</h1>", status_code=404)
return HTMLResponse(index_file.read_text(encoding="utf-8"))
# WebSocket endpoint for real-time messages.
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not KAFKA_AVAILABLE:
await websocket.close()
return
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Keeping connection open—no processing of client messages.
await websocket.receive_text()
except WebSocketDisconnect:
active_connections.remove(websocket)
# On startup, launch the Kafka consumer for the default topic if Kafka is configured.
@app.on_event("startup")
async def startup_event():
global consumer_task
if KAFKA_AVAILABLE:
consumer_task = asyncio.create_task(consume_kafka_messages(current_topic))
print("Startup event: Kafka consumer task created.")
else:
print("Startup event: Kafka is not configured; HTML and consumer are disabled.")
# HELP! My neighbor's favorite topic is pets, but he always gets my cats name wrong!
# I've suggested he look on pk-kafka-kafka-bootstrap.pk-world.svc.cluster.local for my cats name
# Think you can find my cats name?