menu

arrow_back Implementation of the redis pub/sub pattern?

by
0 votes
Trying to implement websocket chat on tornado. As a handler, a new message decided to use redis, but hang listen() on the main thread( And all implementations because bruwka, toredis and others just terrible. I saw somewhere on github elegant solution, using the DOP event IOLoop thread. I should be grateful in this matter

2 Comments

Sergey Gornostaev , DRF when saving to the database makes the entry in the redis type thread_id: message
React on frontend which opens a websocket from Tornado and subscribes to the updates messages to Redis.
And here is the step with subscribe I can't perform.
class EchoWebSocket(tornado.websocket.WebSocketHandler):

def open(self):
print("WebSocket opened")
redis_client = redis.StrictRedis()
sub = redis_client.pubsub()
sub.subscribe('channel')
for message in sub.listen(): # поместить в основной цикл не могу
print(message.body)


def send_new_message(self, evt):
print(evt.body)

def check_origin(self, origin):
return True

def on_message(self, data):
json_data = json.loads(data)
self.token = json_data['token']
self.message = json_data['message']
self.thread = json_data['thread']
self.post()
Could you explain why do you need Redis.

1 Answer

by
 
Best answer
0 votes
Brukva library and toredis already has not been updated for several years. But the Tornado in the latest update finally switched to using the event queue asyncio, which gives the opportunity to use fresh and a good library. For example aioredis :
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop

connections = []

class WSHandler(websocket.WebSocketHandler):
def open(self):
connections.append(self)

def on_message(self, message):
...

def on_close(self):
connections.remove(self)


class GetHandler(web.RequestHandler):
def get(self):
self.render("chat.html")


async def consumer(channel):
while await channel.wait_message():
msg = await channel.get(encoding='utf-8')
for connection in connections:
await connection.write_message(msg)


async def setup():
connection = await aioredis.create_redis('redis://localhost')
channel = await connection.subscribe('notifications')
asyncio.ensure_future(consumer(channel))


application = web.Application([
(r'/', GetHandler),
(r'/chat/', WSHandler),
])


if __name__ == '__main__':
application.listen(8000)
loop = IOLoop.current()
loop.add_callback(setup)
loop.start()

Естественно, это максимально упрощённый пример, в реальном коде соединения не стоит держать в глобальной переменной, а при завершении работы сервера стоит отписаться от канала и закрыть соединение с redis.