kombu.transport.virtual
Virtual transport implementation.
Emulates the AMQ API for non-AMQ transports.
-
class kombu.transport.virtual.Transport(client, **kwargs)
Virtual transport.
-
Channel = <class 'kombu.transport.virtual.Channel'>
-
Cycle = <class 'kombu.transport.virtual.scheduling.FairCycle'>
-
polling_interval = 1.0
Time to sleep between unsuccessful polls.
-
default_port = None
port number used when no port is specified.
-
state = <kombu.transport.virtual.BrokerState object at 0x4f5ee50>
BrokerState containing declared exchanges and
bindings (set by constructor).
-
cycle = None
FairCycle instance
used to fairly drain events from channels (set by constructor).
-
establish_connection()
-
close_connection(connection)
-
create_channel(connection)
-
close_channel(channel)
-
drain_events(connection, timeout=None)
-
class kombu.transport.virtual.AbstractChannel
This is an abstract class defining the channel methods
you’d usually want to implement in a virtual channel.
Do not subclass directly, but rather inherit from Channel
instead.
-
class kombu.transport.virtual.Channel(connection, **kwargs)
Virtual channel.
Parameters: | connection – The transport instance this channel is part of. |
-
Message = <class 'kombu.transport.virtual.Message'>
message class used.
-
state
Broker state containing exchanges and bindings.
-
qos
QoS manager for this channel.
-
do_restore = True
flag to restore unacked messages when channel
goes out of scope.
-
exchange_types = {'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>, 'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, 'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>}
mapping of exchange types and corresponding classes.
-
exchange_declare(exchange=None, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False)
Declare exchange.
-
exchange_delete(exchange, if_unused=False, nowait=False)
Delete exchange and all its bindings.
-
queue_declare(queue=None, passive=False, **kwargs)
Declare queue.
-
queue_delete(queue, if_unusued=False, if_empty=False, **kwargs)
Delete queue.
-
queue_bind(queue, exchange=None, routing_key='', arguments=None, **kwargs)
Bind queue to exchange with routing key.
-
queue_purge(queue, **kwargs)
Remove all ready messages from queue.
-
basic_publish(message, exchange, routing_key, **kwargs)
Publish message.
-
basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)
Consume from queue
-
basic_cancel(consumer_tag)
Cancel consumer by consumer tag.
-
basic_get(queue, no_ack=False, **kwargs)
Get message by direct access (synchronous).
-
basic_ack(delivery_tag)
Acknowledge message.
-
basic_recover(requeue=False)
Recover unacked messages.
-
basic_reject(delivery_tag, requeue=False)
Reject message.
-
basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)
Change QoS settings for this channel.
Only prefetch_count is supported.
-
get_table(exchange)
Get table of bindings for exchange.
-
typeof(exchange, default='direct')
Get the exchange type instance for exchange.
-
drain_events(timeout=None)
-
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)
Prepare message data.
-
message_to_python(raw_message)
Convert raw message to Message instance.
-
flow(active=True)
Enable/disable message flow.
Raises NotImplementedError: |
| as flow
is not implemented by the base virtual implementation. |
-
close()
Close channel, cancel all consumers, and requeue unacked
messages.
-
class kombu.transport.virtual.Message(channel, payload, **kwargs)
-
exception MessageStateError
The message has already been acknowledged.
-
args
-
message
-
Message.accept
-
Message.ack()
Acknowledge this message as being processed.,
This will remove the message from the queue.
Raises MessageStateError: |
| If the message has already been
acknowledged/requeued/rejected. |
-
Message.ack_log_error(logger, errors)
-
Message.acknowledged
Set to true if the message has been acknowledged.
-
Message.body
-
Message.channel
-
Message.content_encoding
-
Message.content_type
-
Message.decode()
Deserialize the message body, returning the original
python structure sent by the publisher.
-
Message.delivery_info
-
Message.delivery_tag
-
Message.payload
The decoded message body.
-
Message.properties
-
Message.reject()
Reject this message.
The message will be discarded by the server.
Raises MessageStateError: |
| If the message has already been
acknowledged/requeued/rejected. |
-
Message.reject_log_error(logger, errors)
-
Message.requeue()
Reject this message and put it back on the queue.
You must not use this method as a means of selecting messages
to process.
Raises MessageStateError: |
| If the message has already been
acknowledged/requeued/rejected. |
-
Message.serializable()
-
class kombu.transport.virtual.QoS(channel, prefetch_count=0)
Quality of Service guarantees.
Only supports prefetch_count at this point.
Parameters: |
- channel – AMQ Channel.
- prefetch_count – Initial prefetch count (defaults to 0).
|
-
ack(delivery_tag)
Acknowledge message and remove from transactional state.
-
append(message, delivery_tag)
Append message to transactional state.
-
can_consume()
Returns true if the channel can be consumed from.
Used to ensure the client adhers to currently active
prefetch limits.
-
get(delivery_tag)
-
prefetch_count = 0
current prefetch count value
-
reject(delivery_tag, requeue=False)
Remove from transactional state and requeue message.
-
restore_at_shutdown = True
If disabled, unacked messages won’t be restored at shutdown.
-
restore_unacked()
Restore all unacknowledged messages.
-
restore_unacked_once()
Restores all unacknowledged message at shutdown/gc collect.
Will only be done once for each instance.
-
restore_visible(start=0, num=10, interval=10)
-
class kombu.transport.virtual.BrokerState(exchanges=None, bindings=None)
-
bindings = None
active bindings.
-
clear()
-
exchanges = None
exchange declarations.