60 lines
1.3 KiB
Python
60 lines
1.3 KiB
Python
import json
|
|
import threading
|
|
from time import time
|
|
|
|
import pika
|
|
from django.conf import settings
|
|
from pika.exceptions import AMQPError
|
|
|
|
__all__ = ["EventPoster", "post", "last"]
|
|
|
|
|
|
class EventPoster(object):
|
|
def __init__(self):
|
|
self._connect()
|
|
self._exchange = settings.EVENT_DAEMON_AMQP_EXCHANGE
|
|
|
|
def _connect(self):
|
|
self._conn = pika.BlockingConnection(
|
|
pika.URLParameters(settings.EVENT_DAEMON_AMQP)
|
|
)
|
|
self._chan = self._conn.channel()
|
|
|
|
def post(self, channel, message, tries=0):
|
|
try:
|
|
id = int(time() * 1000000)
|
|
self._chan.basic_publish(
|
|
self._exchange,
|
|
"",
|
|
json.dumps({"id": id, "channel": channel, "message": message}),
|
|
)
|
|
return id
|
|
except AMQPError:
|
|
if tries > 10:
|
|
raise
|
|
self._connect()
|
|
return self.post(channel, message, tries + 1)
|
|
|
|
|
|
_local = threading.local()
|
|
|
|
|
|
def _get_poster():
|
|
if "poster" not in _local.__dict__:
|
|
_local.poster = EventPoster()
|
|
return _local.poster
|
|
|
|
|
|
def post(channel, message):
|
|
try:
|
|
return _get_poster().post(channel, message)
|
|
except AMQPError:
|
|
try:
|
|
del _local.poster
|
|
except AttributeError:
|
|
pass
|
|
return 0
|
|
|
|
|
|
def last():
|
|
return int(time() * 1000000)
|