NDOJ/judge/event_poster_amqp.py

61 lines
1.3 KiB
Python
Raw Permalink Normal View History

2020-01-21 06:35:58 +00:00
import json
import threading
from time import time
import pika
from django.conf import settings
from pika.exceptions import AMQPError
2022-05-14 17:57:27 +00:00
__all__ = ["EventPoster", "post", "last"]
2020-01-21 06:35:58 +00:00
class EventPoster(object):
def __init__(self):
self._connect()
self._exchange = settings.EVENT_DAEMON_AMQP_EXCHANGE
def _connect(self):
2022-05-14 17:57:27 +00:00
self._conn = pika.BlockingConnection(
pika.URLParameters(settings.EVENT_DAEMON_AMQP)
)
2020-01-21 06:35:58 +00:00
self._chan = self._conn.channel()
def post(self, channel, message, tries=0):
try:
id = int(time() * 1000000)
2022-05-14 17:57:27 +00:00
self._chan.basic_publish(
self._exchange,
"",
json.dumps({"id": id, "channel": channel, "message": message}),
)
2020-01-21 06:35:58 +00:00
return id
except AMQPError:
if tries > 10:
raise
self._connect()
return self.post(channel, message, tries + 1)
_local = threading.local()
def _get_poster():
2022-05-14 17:57:27 +00:00
if "poster" not in _local.__dict__:
2020-01-21 06:35:58 +00:00
_local.poster = EventPoster()
return _local.poster
def post(channel, message):
try:
return _get_poster().post(channel, message)
except AMQPError:
try:
del _local.poster
except AttributeError:
pass
return 0
def last():
return int(time() * 1000000)