#!/usr/bin/python3
# coding:utf-8
import uuid
import aio_pika
import asyncio
import json
import logging
import traceback
from datetime import datetime
# import pika
from aio_pika.pool import Pool
from aio_pika import Message, DeliveryMode, IncomingMessage, ExchangeType
from retrying import retry
from getConfig import MQConfig
RabbitMQConfig = MQConfig
RabbitMQ = RabbitMQConfig.get('MQ')
@retry()
async def MonitorMQRequest(loop, queue_name, callback):
async def get_connection() -> aio_pika.Connection:
return await aio_pika.connect_robust(host=RabbitMQ.get('host'),
login=RabbitMQ.get('user'),
password=RabbitMQ.get('password'),
virtualhost=RabbitMQ.get('virtual_host'),
port=RabbitMQ.get('port'))
connection_pool = Pool(get_connection, max_size=2, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection: # type: aio_pika.Connection
return await connection.channel()
channel_pool = Pool(get_channel, max_size=10, loop=loop)
async def consume():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(RabbitMQConfig.get("Qos", 20))
queue = await channel.declare_queue(
queue_name, durable=True, auto_delete=False
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
# 如果不对获得的消息进行单引号改双引号处理,会出现json字符串转字典报错
try:
data_dict = json.loads(message.body.decode().replace("'", '"'))
# 数据逻辑处理
callback(data_dict, data_dict.get('Id') if data_dict.get('Id', '') != '' else uuid.uuid1())
# await db_operation(db_session, redis_session, data_dict)
await message.ack()
except Exception as e:
await message.ack()
async with connection_pool, channel_pool:
await loop.create_task(consume())
@retry()
async def SendMQMessage(loop, routing_key, queue_name, message, delivery_mode=1):
def delivery_mode_value(_delivery_mode: int) -> DeliveryMode:
DELIVERY_MODE = {
1: DeliveryMode.PERSISTENT,
0: DeliveryMode.NOT_PERSISTENT
}
return DELIVERY_MODE.get(_delivery_mode, DeliveryMode.PERSISTENT)
async def get_connection() -> aio_pika.Connection:
return await aio_pika.connect_robust(host=RabbitMQ.get('host'),
login=RabbitMQ.get('user'),
virtualhost=RabbitMQ.get('virtual_host'),
password=RabbitMQ.get('password'),
port=RabbitMQ.get('port'))
connection_pool = Pool(get_connection, max_size=2, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection: # type: aio_pika.Connection
return await connection.channel(publisher_confirms=True)
channel_pool = Pool(get_channel, max_size=10, loop=loop)
async def publish():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
channel_with = await channel.declare_exchange(
exchange, ExchangeType.TOPIC, durable=True
)
channel_message = Message(body=json.dumps(message).encode(),
delivery_mode=delivery_mode_value(delivery_mode), reply_to=queue_name,
correlation_id=IncomingMessage.correlation_id)
publish_message = await channel_with.publish(
message=channel_message,
routing_key=routing_key,
mandatory=True
)
# Basic.Ack 3932240 [] 1 80 False False
# print(a.name, a.index, a.valid_responses, a.delivery_tag, a.frame_id, a.multiple, a.synchronous)
_logger.info("[Channel_message] Sent [{0}]--{1} ".format(message, channel_message))
async with connection_pool, channel_pool:
await loop.create_task(publish())
最后一次更新于2021-11-16 10:51:44
0 条评论