Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try using redis streams instead of rabbitmq for spark #2957

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions consul_config.py.ctmpl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ PLAYING_NOW_EXCHANGE = '''{{template "KEY" "playing_now_exchange"}}'''
PLAYING_NOW_QUEUE = '''{{template "KEY" "playing_now_queue"}}'''
SPOTIFY_METADATA_QUEUE = '''{{template "KEY" "spotify_metadata_queue"}}'''

# redis streams
SPARK_REQUEST_STREAM = "listenbrainz_spark:requests"
SPARK_REQUEST_STREAM_LAST_SEEN = "listenbrainz_spark:last_seen:requests"
SPARK_RESULT_STREAM = "listenbrainz_spark:results"
SPARK_RESULT_STREAM_LAST_SEEN = "listenbrainz_spark:last_seen:results"
SPARK_RESULT_STREAM_LEN = 50000

SPARK_RESULT_EXCHANGE = '''{{template "KEY" "spark_result_exchange"}}'''
SPARK_RESULT_QUEUE = '''{{template "KEY" "spark_result_queue"}}'''
SPARK_REQUEST_EXCHANGE = '''{{template "KEY" "spark_request_exchange"}}'''
Expand Down
7 changes: 7 additions & 0 deletions listenbrainz/config.py.sample
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ REDIS_HOST = "redis"
REDIS_PORT = 6379
REDIS_NAMESPACE = "listenbrainz"

# redis streams
SPARK_REQUEST_STREAM = "listenbrainz_spark:requests"
SPARK_REQUEST_STREAM_LAST_SEEN = "listenbrainz_spark:last_seen:requests"
SPARK_RESULT_STREAM = "listenbrainz_spark:results"
SPARK_RESULT_STREAM_LAST_SEEN = "listenbrainz_spark:last_seen:results"
SPARK_RESULT_STREAM_LEN = 50000

# RabbitMQ
RABBITMQ_HOST = "rabbitmq"
RABBITMQ_PORT = 5672
Expand Down
47 changes: 1 addition & 46 deletions listenbrainz/spark/background.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import logging
from queue import Queue, Empty
from threading import Thread

import orjson
import sentry_sdk
Expand Down Expand Up @@ -72,52 +70,9 @@ class BackgroundJobProcessor:

def __init__(self, app):
self.app = app
self.thread = None
self.done = False
self.internal_message_queue = Queue()
self.internal_message_ack_queue = Queue()

self.response_handlers = {}
self.register_handlers()

def terminate(self):
""" Stop the background job processor and its thread """
self.done = True
self.thread.join()

def enqueue(self, message):
""" Add a message for processing to internal queue """
self.internal_message_queue.put(message, block=False)

def pending_acks(self):
""" Add a processed message to internal queue for acknowledging the message to rabbitmq. """
messages = []
while True:
try:
message = self.internal_message_ack_queue.get(block=False)
messages.append(message)
except Empty:
break
return messages

def run(self):
""" Infinite loop that keeps processing messages enqueued in the internal message queue and puts them on
ack queue if successfully processed.
"""
with self.app.app_context():
while not self.done:
try:
message = self.internal_message_queue.get(block=True, timeout=5)
self.process_message(message)
self.internal_message_ack_queue.put(message)
except Empty:
self.app.logger.debug("Empty internal message queue")

def start(self):
""" Start running the background job processor in its own thread """
self.thread = Thread(target=self.run, name="SparkReaderBackgroundJobProcessor")
self.thread.start()

def register_handlers(self):
""" Register handlers for the Spark reader """
datasets = [
Expand Down Expand Up @@ -170,7 +125,7 @@ def register_handlers(self):
def process_message(self, message):
""" Process a message received by the spark reader """
try:
response = orjson.loads(message.body)
response = orjson.loads(message[b"result"])
except Exception:
self.app.logger.error("Error processing message: %s", message)
return
Expand Down
37 changes: 12 additions & 25 deletions listenbrainz/spark/request_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click
import orjson
from kombu import Connection
from kombu.entity import PERSISTENT_DELIVERY_MODE, Exchange
from redis.client import Redis

from listenbrainz.troi.weekly_playlists import get_users_for_weekly_playlists
from listenbrainz.utils import get_fallback_connection_name
Expand Down Expand Up @@ -49,41 +49,28 @@ def _prepare_query_message(query, **params):
if query not in possible_queries:
raise InvalidSparkRequestError(query)

message = {'query': possible_queries[query]['name']}
required_params = set(possible_queries[query]['params'])
given_params = set(params.keys())
if required_params != given_params:
raise InvalidSparkRequestError

if params:
message['params'] = {}
for key, value in params.items():
message['params'][key] = value

return orjson.dumps(message)
message = {
b"query": query,
b"params": orjson.dumps(params or {}),
}
return message


def send_request_to_spark_cluster(query, **params):
app = create_app()
with app.app_context():
message = _prepare_query_message(query, **params)
connection = Connection(
hostname=app.config["RABBITMQ_HOST"],
userid=app.config["RABBITMQ_USERNAME"],
port=app.config["RABBITMQ_PORT"],
password=app.config["RABBITMQ_PASSWORD"],
virtual_host=app.config["RABBITMQ_VHOST"],
transport_options={"client_properties": {"connection_name": get_fallback_connection_name()}}
)
producer = connection.Producer()
spark_request_exchange = Exchange(app.config["SPARK_REQUEST_EXCHANGE"], "fanout", durable=False)
producer.publish(
message,
routing_key="",
exchange=spark_request_exchange,
delivery_mode=PERSISTENT_DELIVERY_MODE,
declare=[spark_request_exchange]
redis_conn = Redis(
app.config["REDIS_HOST"],
app.config["REDIS_PORT"],
client_name=get_fallback_connection_name()
)
message = _prepare_query_message(query, **params)
redis_conn.xadd(app.config["SPARK_REQUEST_STREAM"], message)


@cli.command(name="request_user_stats")
Expand Down
70 changes: 29 additions & 41 deletions listenbrainz/spark/spark_reader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/usr/bin/env python3
import time

from kombu import Connection, Message, Consumer, Exchange, Queue
from kombu.mixins import ConsumerMixin
from redis.client import Redis

from listenbrainz.spark.background import BackgroundJobProcessor
from listenbrainz.utils import get_fallback_connection_name
Expand All @@ -11,67 +10,56 @@
PREFETCH_COUNT = 1000


class SparkReader(ConsumerMixin):
class SparkReader:

def __init__(self, app):
self.app = app
self.connection: Connection | None = None
self.spark_result_exchange = Exchange(app.config["SPARK_RESULT_EXCHANGE"], "fanout", durable=False)
self.spark_result_queue = Queue(app.config["SPARK_RESULT_QUEUE"], exchange=self.spark_result_exchange,
durable=True)
self.response_handlers = {}
self.redis_conn: Redis | None = None
self.processor: BackgroundJobProcessor | None = None
self.last_seen_id = b"0-0"

def callback(self, message: Message):
""" Handle the data received from the queue and insert into the database accordingly. """
self.app.logger.debug("Received a message, adding to internal processing queue...")
self.processor.enqueue(message)
def run(self):
while True:
streams = self.redis_conn.xread(
{self.app.config["SPARK_RESULT_STREAM"]: self.last_seen_id},
count=PREFETCH_COUNT,
block=5000
)
if not streams:
continue

def on_iteration(self):
""" Executed periodically in the main consumption loop by kombu, we check for completed messages here
and acknowledge them. """
for message in self.processor.pending_acks():
message.ack()
self.app.logger.info('Received a request!')
messages = streams[0][1]
message_id, body = messages[0]
with self.app.app_context():
self.processor.process_message(body)

def get_consumers(self, _, channel):
return [Consumer(
channel,
prefetch_count=PREFETCH_COUNT,
queues=[self.spark_result_queue],
on_message=lambda msg: self.callback(msg)
)]
self.last_seen_id = message_id
self.redis_conn.xtrim(self.app.config["SPARK_RESULT_STREAM"], minid=self.last_seen_id)
self.redis_conn.set(self.app.config["SPARK_RESULT_STREAM_LAST_SEEN"], self.last_seen_id)
self.app.logger.info('Request done!')

def init_rabbitmq_connection(self):
self.connection = Connection(
hostname=self.app.config["RABBITMQ_HOST"],
userid=self.app.config["RABBITMQ_USERNAME"],
port=self.app.config["RABBITMQ_PORT"],
password=self.app.config["RABBITMQ_PASSWORD"],
virtual_host=self.app.config["RABBITMQ_VHOST"],
transport_options={"client_properties": {"connection_name": get_fallback_connection_name()}}
def init_redis_connection(self):
self.redis_conn = Redis(
self.app.config["REDIS_HOST"],
self.app.config["REDIS_PORT"],
client_name=get_fallback_connection_name()
)
self.last_seen_id = self.redis_conn.get(self.app.config["SPARK_RESULT_STREAM_LAST_SEEN"]) or b"0-0"

def start(self):
""" initiates RabbitMQ connection and starts consuming from the queue """
while True:
try:
self.app.logger.info("Spark consumer has started!")
self.init_rabbitmq_connection()

self.init_redis_connection()
self.processor = BackgroundJobProcessor(self.app)
self.processor.start()

self.run()
self.processor.terminate()
except KeyboardInterrupt:
self.app.logger.error("Keyboard interrupt!")
if self.processor is not None:
self.processor.terminate()
break
except Exception:
self.app.logger.error("Error in SparkReader:", exc_info=True)
if self.processor is not None:
self.processor.terminate()
time.sleep(3)


Expand Down
Loading
Loading