From 17b5be250cb5ecf95cf4f77c8c930450d876efa6 Mon Sep 17 00:00:00 2001 From: dzen Date: Thu, 12 Mar 2020 08:32:29 +0100 Subject: [PATCH 1/3] Moves to pamqp 3.0 --- aioamqp/channel.py | 114 +++++++++++++++++++------------------- aioamqp/frame.py | 1 - aioamqp/protocol.py | 22 ++++---- aioamqp/tests/testcase.py | 2 +- 4 files changed, 69 insertions(+), 70 deletions(-) diff --git a/aioamqp/channel.py b/aioamqp/channel.py index 7f0f402..ea164c4 100644 --- a/aioamqp/channel.py +++ b/aioamqp/channel.py @@ -9,7 +9,7 @@ from itertools import count import warnings -import pamqp.specification +import pamqp.commands from . import frame as amqp_frame from . import exceptions @@ -78,35 +78,35 @@ def connection_closed(self, server_code=None, server_reason=None, exception=None async def dispatch_frame(self, frame): methods = { - pamqp.specification.Channel.OpenOk.name: self.open_ok, - pamqp.specification.Channel.FlowOk.name: self.flow_ok, - pamqp.specification.Channel.CloseOk.name: self.close_ok, - pamqp.specification.Channel.Close.name: self.server_channel_close, - - pamqp.specification.Exchange.DeclareOk.name: self.exchange_declare_ok, - pamqp.specification.Exchange.BindOk.name: self.exchange_bind_ok, - pamqp.specification.Exchange.UnbindOk.name: self.exchange_unbind_ok, - pamqp.specification.Exchange.DeleteOk.name: self.exchange_delete_ok, - - pamqp.specification.Queue.DeclareOk.name: self.queue_declare_ok, - pamqp.specification.Queue.DeleteOk.name: self.queue_delete_ok, - pamqp.specification.Queue.BindOk.name: self.queue_bind_ok, - pamqp.specification.Queue.UnbindOk.name: self.queue_unbind_ok, - pamqp.specification.Queue.PurgeOk.name: self.queue_purge_ok, - - pamqp.specification.Basic.QosOk.name: self.basic_qos_ok, - pamqp.specification.Basic.ConsumeOk.name: self.basic_consume_ok, - pamqp.specification.Basic.CancelOk.name: self.basic_cancel_ok, - pamqp.specification.Basic.GetOk.name: self.basic_get_ok, - pamqp.specification.Basic.GetEmpty.name: self.basic_get_empty, - pamqp.specification.Basic.Deliver.name: self.basic_deliver, - pamqp.specification.Basic.Cancel.name: self.server_basic_cancel, - pamqp.specification.Basic.Ack.name: self.basic_server_ack, - pamqp.specification.Basic.Nack.name: self.basic_server_nack, - pamqp.specification.Basic.RecoverOk.name: self.basic_recover_ok, - pamqp.specification.Basic.Return.name: self.basic_return, - - pamqp.specification.Confirm.SelectOk.name: self.confirm_select_ok, + pamqp.commands.Channel.OpenOk.name: self.open_ok, + pamqp.commands.Channel.FlowOk.name: self.flow_ok, + pamqp.commands.Channel.CloseOk.name: self.close_ok, + pamqp.commands.Channel.Close.name: self.server_channel_close, + + pamqp.commands.Exchange.DeclareOk.name: self.exchange_declare_ok, + pamqp.commands.Exchange.BindOk.name: self.exchange_bind_ok, + pamqp.commands.Exchange.UnbindOk.name: self.exchange_unbind_ok, + pamqp.commands.Exchange.DeleteOk.name: self.exchange_delete_ok, + + pamqp.commands.Queue.DeclareOk.name: self.queue_declare_ok, + pamqp.commands.Queue.DeleteOk.name: self.queue_delete_ok, + pamqp.commands.Queue.BindOk.name: self.queue_bind_ok, + pamqp.commands.Queue.UnbindOk.name: self.queue_unbind_ok, + pamqp.commands.Queue.PurgeOk.name: self.queue_purge_ok, + + pamqp.commands.Basic.QosOk.name: self.basic_qos_ok, + pamqp.commands.Basic.ConsumeOk.name: self.basic_consume_ok, + pamqp.commands.Basic.CancelOk.name: self.basic_cancel_ok, + pamqp.commands.Basic.GetOk.name: self.basic_get_ok, + pamqp.commands.Basic.GetEmpty.name: self.basic_get_empty, + pamqp.commands.Basic.Deliver.name: self.basic_deliver, + pamqp.commands.Basic.Cancel.name: self.server_basic_cancel, + pamqp.commands.Basic.Ack.name: self.basic_server_ack, + pamqp.commands.Basic.Nack.name: self.basic_server_nack, + pamqp.commands.Basic.RecoverOk.name: self.basic_recover_ok, + pamqp.commands.Basic.Return.name: self.basic_return, + + pamqp.commands.Confirm.SelectOk.name: self.confirm_select_ok, } if frame.name not in methods: @@ -144,7 +144,7 @@ async def _write_frame_awaiting_response(self, waiter_id, channel_id, request, async def open(self): """Open the channel on the server.""" - request = pamqp.specification.Channel.Open() + request = pamqp.commands.Channel.Open() return (await self._write_frame_awaiting_response( 'open', self.channel_id, request, no_wait=False, check_open=False)) @@ -159,7 +159,7 @@ async def close(self, reply_code=0, reply_text="Normal Shutdown"): if not self.is_open: raise exceptions.ChannelClosed("channel already closed or closing") self.close_event.set() - request = pamqp.specification.Channel.Close(reply_code, reply_text, class_id=0, method_id=0) + request = pamqp.commands.Channel.Close(reply_code, reply_text, class_id=0, method_id=0) return (await self._write_frame_awaiting_response( 'close', self.channel_id, request, no_wait=False, check_open=False)) @@ -169,7 +169,7 @@ async def close_ok(self, frame): self.protocol.release_channel_id(self.channel_id) async def _send_channel_close_ok(self): - request = pamqp.specification.Channel.CloseOk() + request = pamqp.commands.Channel.CloseOk() await self._write_frame(self.channel_id, request) async def server_channel_close(self, frame): @@ -183,7 +183,7 @@ async def server_channel_close(self, frame): self.connection_closed(results['reply_code'], results['reply_text']) async def flow(self, active): - request = pamqp.specification.Channel.Flow(active) + request = pamqp.commands.Channel.Flow(active) return (await self._write_frame_awaiting_response( 'flow', self.channel_id, request, no_wait=False, check_open=False)) @@ -201,7 +201,7 @@ async def flow_ok(self, frame): async def exchange_declare(self, exchange_name, type_name, passive=False, durable=False, auto_delete=False, no_wait=False, arguments=None): - request = pamqp.specification.Exchange.Declare( + request = pamqp.commands.Exchange.Declare( exchange=exchange_name, exchange_type=type_name, passive=passive, @@ -222,7 +222,7 @@ async def exchange_declare_ok(self, frame): return future async def exchange_delete(self, exchange_name, if_unused=False, no_wait=False): - request = pamqp.specification.Exchange.Delete(exchange=exchange_name, if_unused=if_unused, nowait=no_wait) + request = pamqp.commands.Exchange.Delete(exchange=exchange_name, if_unused=if_unused, nowait=no_wait) return await self._write_frame_awaiting_response( 'exchange_delete', self.channel_id, request, no_wait) @@ -235,7 +235,7 @@ async def exchange_bind(self, exchange_destination, exchange_source, routing_key no_wait=False, arguments=None): if arguments is None: arguments = {} - request = pamqp.specification.Exchange.Bind( + request = pamqp.commands.Exchange.Bind( destination=exchange_destination, source=exchange_source, routing_key=routing_key, @@ -255,7 +255,7 @@ async def exchange_unbind(self, exchange_destination, exchange_source, routing_k if arguments is None: arguments = {} - request = pamqp.specification.Exchange.Unbind( + request = pamqp.commands.Exchange.Unbind( destination=exchange_destination, source=exchange_source, routing_key=routing_key, @@ -297,7 +297,7 @@ async def queue_declare(self, queue_name=None, passive=False, durable=False, if not queue_name: queue_name = 'aioamqp.gen-' + str(uuid.uuid4()) - request = pamqp.specification.Queue.Declare( + request = pamqp.commands.Queue.Declare( queue=queue_name, passive=passive, durable=durable, @@ -327,7 +327,7 @@ async def queue_delete(self, queue_name, if_unused=False, if_empty=False, no_wai if_empty: bool, the queue is deleted if it has no messages. Raise if not. no_wait: bool, if set, the server will not respond to the method """ - request = pamqp.specification.Queue.Delete( + request = pamqp.commands.Queue.Delete( queue=queue_name, if_unused=if_unused, if_empty=if_empty, @@ -346,7 +346,7 @@ async def queue_bind(self, queue_name, exchange_name, routing_key, no_wait=False if arguments is None: arguments = {} - request = pamqp.specification.Queue.Bind( + request = pamqp.commands.Queue.Bind( queue=queue_name, exchange=exchange_name, routing_key=routing_key, @@ -367,7 +367,7 @@ async def queue_unbind(self, queue_name, exchange_name, routing_key, arguments=N if arguments is None: arguments = {} - request = pamqp.specification.Queue.Unbind( + request = pamqp.commands.Queue.Unbind( queue=queue_name, exchange=exchange_name, routing_key=routing_key, @@ -383,7 +383,7 @@ async def queue_unbind_ok(self, frame): logger.debug("Queue unbound") async def queue_purge(self, queue_name, no_wait=False): - request = pamqp.specification.Queue.Purge( + request = pamqp.commands.Queue.Purge( queue=queue_name, nowait=no_wait ) return (await self._write_frame_awaiting_response( @@ -406,7 +406,7 @@ async def basic_publish(self, payload, exchange_name, routing_key, if properties is None: properties = {} - method_request = pamqp.specification.Basic.Publish( + method_request = pamqp.commands.Basic.Publish( exchange=exchange_name, routing_key=routing_key, mandatory=mandatory, @@ -417,7 +417,7 @@ async def basic_publish(self, payload, exchange_name, routing_key, header_request = pamqp.header.ContentHeader( body_size=len(payload), - properties=pamqp.specification.Basic.Properties(**properties) + properties=pamqp.commands.Basic.Properties(**properties) ) await self._write_frame(self.channel_id, header_request, drain=False) @@ -446,7 +446,7 @@ async def basic_qos(self, prefetch_size=0, prefetch_count=0, connection_global=F settings should apply per-consumer channel; and global=true to mean that the QoS settings should apply per-channel. """ - request = pamqp.specification.Basic.Qos( + request = pamqp.commands.Basic.Qos( prefetch_size, prefetch_count, connection_global ) return (await self._write_frame_awaiting_response( @@ -490,7 +490,7 @@ async def basic_consume(self, callback, queue_name='', consumer_tag='', no_local if arguments is None: arguments = {} - request = pamqp.specification.Basic.Consume( + request = pamqp.commands.Basic.Consume( queue=queue_name, consumer_tag=consumer_tag, no_local=no_local, @@ -561,7 +561,7 @@ async def server_basic_cancel(self, frame): callback, error) async def basic_cancel(self, consumer_tag, no_wait=False): - request = pamqp.specification.Basic.Cancel(consumer_tag, no_wait) + request = pamqp.commands.Basic.Cancel(consumer_tag, no_wait) return (await self._write_frame_awaiting_response( 'basic_cancel', self.channel_id, request, no_wait=no_wait) ) @@ -575,7 +575,7 @@ async def basic_cancel_ok(self, frame): logger.debug("Cancel ok") async def basic_get(self, queue_name='', no_ack=False): - request = pamqp.specification.Basic.Get(queue=queue_name, no_ack=no_ack) + request = pamqp.commands.Basic.Get(queue=queue_name, no_ack=no_ack) return (await self._write_frame_awaiting_response( 'basic_get', self.channel_id, request, no_wait=False) ) @@ -606,11 +606,11 @@ async def basic_get_empty(self, frame): future.set_exception(exceptions.EmptyQueue) async def basic_client_ack(self, delivery_tag, multiple=False): - request = pamqp.specification.Basic.Ack(delivery_tag, multiple) + request = pamqp.commands.Basic.Ack(delivery_tag, multiple) await self._write_frame(self.channel_id, request) async def basic_client_nack(self, delivery_tag, multiple=False, requeue=True): - request = pamqp.specification.Basic.Nack(delivery_tag, multiple, requeue) + request = pamqp.commands.Basic.Nack(delivery_tag, multiple, requeue) await self._write_frame(self.channel_id, request) async def basic_server_ack(self, frame): @@ -620,15 +620,15 @@ async def basic_server_ack(self, frame): fut.set_result(True) async def basic_reject(self, delivery_tag, requeue=False): - request = pamqp.specification.Basic.Reject(delivery_tag, requeue) + request = pamqp.commands.Basic.Reject(delivery_tag, requeue) await self._write_frame(self.channel_id, request) async def basic_recover_async(self, requeue=True): - request = pamqp.specification.Basic.RecoverAsync(requeue) + request = pamqp.commands.Basic.RecoverAsync(requeue) await self._write_frame(self.channel_id, request) async def basic_recover(self, requeue=True): - request = pamqp.specification.Basic.Recover(requeue) + request = pamqp.commands.Basic.Recover(requeue) return (await self._write_frame_awaiting_response( 'basic_recover', self.channel_id, request, no_wait=False) ) @@ -681,7 +681,7 @@ async def publish(self, payload, exchange_name, routing_key, properties=None, ma delivery_tag = next(self.delivery_tag_iter) # pylint: disable=stop-iteration-return fut = self._set_waiter('basic_server_ack_{}'.format(delivery_tag)) - method_request = pamqp.specification.Basic.Publish( + method_request = pamqp.commands.Basic.Publish( exchange=exchange_name, routing_key=routing_key, mandatory=mandatory, @@ -689,7 +689,7 @@ async def publish(self, payload, exchange_name, routing_key, properties=None, ma ) await self._write_frame(self.channel_id, method_request, drain=False) - properties = pamqp.specification.Basic.Properties(**properties) + properties = pamqp.commands.Basic.Properties(**properties) header_request = pamqp.header.ContentHeader( body_size=len(payload), properties=properties ) @@ -710,7 +710,7 @@ async def publish(self, payload, exchange_name, routing_key, properties=None, ma async def confirm_select(self, *, no_wait=False): if self.publisher_confirms: raise ValueError('publisher confirms already enabled') - request = pamqp.specification.Confirm.Select(nowait=no_wait) + request = pamqp.commands.Confirm.Select(nowait=no_wait) return (await self._write_frame_awaiting_response( 'confirm_select', self.channel_id, request, no_wait) diff --git a/aioamqp/frame.py b/aioamqp/frame.py index d70cfd7..af27ab5 100644 --- a/aioamqp/frame.py +++ b/aioamqp/frame.py @@ -42,7 +42,6 @@ import socket import pamqp.encode -import pamqp.specification import pamqp.frame from . import exceptions diff --git a/aioamqp/protocol.py b/aioamqp/protocol.py index e111dea..f0b928d 100644 --- a/aioamqp/protocol.py +++ b/aioamqp/protocol.py @@ -5,9 +5,9 @@ import asyncio import logging +import pamqp.commands import pamqp.frame import pamqp.heartbeat -import pamqp.specification from . import channel as amqp_channel from . import constants as amqp_constants @@ -159,7 +159,7 @@ async def close(self, no_wait=False, timeout=None): """Close connection (and all channels)""" await self.ensure_open() self.state = CLOSING - request = pamqp.specification.Connection.Close( + request = pamqp.commands.Connection.Close( reply_code=0, reply_text='', class_id=0, @@ -254,11 +254,11 @@ async def dispatch_frame(self, frame_channel=None, frame=None): """Dispatch the received frame to the corresponding handler""" method_dispatch = { - pamqp.specification.Connection.Close.name: self.server_close, - pamqp.specification.Connection.CloseOk.name: self.close_ok, - pamqp.specification.Connection.Tune.name: self.tune, - pamqp.specification.Connection.Start.name: self.start, - pamqp.specification.Connection.OpenOk.name: self.open_ok, + pamqp.commands.Connection.Close.name: self.server_close, + pamqp.commands.Connection.CloseOk.name: self.close_ok, + pamqp.commands.Connection.Tune.name: self.tune, + pamqp.commands.Connection.Start.name: self.start, + pamqp.commands.Connection.OpenOk.name: self.open_ok, } if frame_channel is None and frame is None: frame_channel, frame = await self.get_frame() @@ -395,7 +395,7 @@ async def start_ok(self, client_properties, mechanism, auth, locale): def credentials(): return '\0{LOGIN}\0{PASSWORD}'.format(**auth) - request = pamqp.specification.Connection.StartOk( + request = pamqp.commands.Connection.StartOk( client_properties=client_properties, mechanism=mechanism, locale=locale, @@ -417,7 +417,7 @@ async def server_close(self, frame): self._stream_writer.close() async def _close_ok(self): - request = pamqp.specification.Connection.CloseOk() + request = pamqp.commands.Connection.CloseOk() await self._write_frame(0, request) async def tune(self, frame): @@ -426,7 +426,7 @@ async def tune(self, frame): self.server_heartbeat = frame.heartbeat async def tune_ok(self, channel_max, frame_max, heartbeat): - request = pamqp.specification.Connection.TuneOk( + request = pamqp.commands.Connection.TuneOk( channel_max, frame_max, heartbeat ) await self._write_frame(0, request) @@ -436,7 +436,7 @@ async def secure_ok(self, login_response): async def open(self, virtual_host, capabilities='', insist=False): """Open connection to virtual host.""" - request = pamqp.specification.Connection.Open( + request = pamqp.commands.Connection.Open( virtual_host, capabilities, insist ) await self._write_frame(0, request) diff --git a/aioamqp/tests/testcase.py b/aioamqp/tests/testcase.py index 120104b..d6d702b 100644 --- a/aioamqp/tests/testcase.py +++ b/aioamqp/tests/testcase.py @@ -147,7 +147,7 @@ def server_version(self, amqp=None): if amqp is None: amqp = self.amqp - server_version = tuple(int(x) for x in amqp.server_properties['version'].decode().split('.')) + server_version = tuple(int(x) for x in amqp.server_properties['version'].split('.')) return server_version async def check_exchange_exists(self, exchange_name): From c900f6d5e8ef273000d221d0e46ab81ed4aed2a2 Mon Sep 17 00:00:00 2001 From: dzen Date: Wed, 25 Mar 2020 11:02:04 +0100 Subject: [PATCH 2/3] fix pamqp version for tests on travis --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a740243..2277b28 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ 'aioamqp', ], install_requires=[ - 'pamqp>=2.2.0,<3', + 'pamqp>=3.0.0', # TODO(bcalvez): for tests purpose, until 3.0 is released ], classifiers=[ "Development Status :: 4 - Beta", From 836340e8d881a93b0111b9aed6f2bb2926f38de6 Mon Sep 17 00:00:00 2001 From: dzen Date: Wed, 25 Mar 2020 11:37:45 +0100 Subject: [PATCH 3/3] pamqp 3.0 is no more compatible with 3.5 --- .travis.yml | 1 - setup.cfg | 2 +- setup.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1069e7c..46d6fea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: python dist: bionic python: -- 3.5 - 3.6 - 3.7-dev - 3.8 diff --git a/setup.cfg b/setup.cfg index 0ab7d0b..d0ba16e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [bdist_wheel] -python-tag = py35.py36.py37.py38 +python-tag = py36.py37.py38 diff --git a/setup.py b/setup.py index 2277b28..c413078 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ install_requires=[ 'pamqp==3.0.0a6', # TODO(bcalvez): for tests purpose, until 3.0 is released ], + python_requires=">=3.6", classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -34,7 +35,6 @@ "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8",