首页 > 代码库 > spring的RabbitTemplate 接收Message源码导读

spring的RabbitTemplate 接收Message源码导读

1,首先调用类SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法。内部类的主要属性如下

		private final BlockingQueueConsumer consumer;		private final CountDownLatch start;		private volatile FatalListenerStartupException startupException;


2,内部类的run方法如下

			boolean aborted = false;			int consecutiveIdles = 0;			int consecutiveMessages = 0;			try {				try {					SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();					this.consumer.start();					this.start.countDown();				}				catch (QueuesNotAvailableException e) {					if (SimpleMessageListenerContainer.this.missingQueuesFatal) {						throw e;					}					else {						this.start.countDown();						handleStartupFailure(e);						throw e;					}				}				catch (FatalListenerStartupException ex) {					throw ex;				}				catch (Throwable t) {					this.start.countDown();					handleStartupFailure(t);					throw t;				}				if (SimpleMessageListenerContainer.this.transactionManager != null) {					/*					 * Register the consumer's channel so it will be used by the transaction manager					 * if it's an instance of RabbitTransactionManager.					 */					ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());				}				// Always better to stop receiving as soon as possible if				// transactional				boolean continuable = false;				while (isActive(this.consumer) || continuable) {					try {						// Will come back false when the queue is drained						continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {							if (continuable) {								consecutiveIdles = 0;								if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {									considerAddingAConsumer();									consecutiveMessages = 0;								}							}							else {								consecutiveMessages = 0;								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {									considerStoppingAConsumer(this.consumer);									consecutiveIdles = 0;								}							}						}					}					catch (ListenerExecutionFailedException ex) {						// Continue to process, otherwise re-throw					}					catch (AmqpRejectAndDontRequeueException rejectEx) {						/*						 *  These will normally be wrapped by an LEFE if thrown by the						 *  listener, but we will also honor it if thrown by an						 *  error handler.						 */					}				}			}			catch (InterruptedException e) {				logger.debug("Consumer thread interrupted, processing stopped.");				Thread.currentThread().interrupt();				aborted = true;			}			catch (QueuesNotAvailableException ex) {				if (SimpleMessageListenerContainer.this.missingQueuesFatal) {					logger.error("Consumer received fatal exception on startup", ex);					this.startupException = ex;					// Fatal, but no point re-throwing, so just abort.					aborted = true;				}			}			catch (FatalListenerStartupException ex) {				logger.error("Consumer received fatal exception on startup", ex);				this.startupException = ex;				// Fatal, but no point re-throwing, so just abort.				aborted = true;			}			catch (FatalListenerExecutionException ex) {				logger.error("Consumer received fatal exception during processing", ex);				// Fatal, but no point re-throwing, so just abort.				aborted = true;			}			catch (ShutdownSignalException e) {				if (RabbitUtils.isNormalShutdown(e)) {					if (logger.isDebugEnabled()) {						logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());					}				}				else {					this.logConsumerException(e);				}			}			catch (AmqpIOException e) {				if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException						&& e.getCause().getCause().getMessage().contains("in exclusive use")) {					logger.warn(e.getCause().getCause().toString());				}				else {					this.logConsumerException(e);				}			}			catch (Error e) {				logger.error("Consumer thread error, thread abort.", e);				aborted = true;			}			catch (Throwable t) {				this.logConsumerException(t);			}			finally {				if (SimpleMessageListenerContainer.this.transactionManager != null) {					ConsumerChannelRegistry.unRegisterConsumerChannel();				}			}			// In all cases count down to allow container to progress beyond startup			start.countDown();			if (!isActive(consumer) || aborted) {				logger.debug("Cancelling " + this.consumer);				try {					this.consumer.stop();					synchronized (consumersMonitor) {						if (SimpleMessageListenerContainer.this.consumers != null) {							SimpleMessageListenerContainer.this.consumers.remove(this.consumer);						}					}				}				catch (AmqpException e) {					logger.info("Could not cancel message consumer", e);				}				if (aborted) {					logger.error("Stopping container from aborted consumer");					stop();				}			}			else {				logger.info("Restarting " + this.consumer);				restart(this.consumer);			}		

3,run方法调用类BlockingQueueConsumer的start方法,BlockingQueueConsumer的属性如下

	private final BlockingQueue<Delivery> queue;	// When this is non-null the connection has been closed (should never happen in normal operation).	private volatile ShutdownSignalException shutdown;	private final String[] queues;	private final int prefetchCount;	private final boolean transactional;	private Channel channel;	private RabbitResourceHolder resourceHolder;	private InternalConsumer consumer;	private final AtomicBoolean cancelled = new AtomicBoolean(false);	private volatile long shutdownTimeout;	private final AtomicBoolean cancelReceived = new AtomicBoolean(false);	private final AcknowledgeMode acknowledgeMode;	private final ConnectionFactory connectionFactory;	private final MessagePropertiesConverter messagePropertiesConverter;	private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;	private final Map<String, Object> consumerArgs = new HashMap<String, Object>();	private final boolean exclusive;	private final Set<Long> deliveryTags = new LinkedHashSet<Long>();	private final boolean defaultRequeuRejected;	private final CountDownLatch suspendClientThread = new CountDownLatch(1);	private final Collection<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());	private final Set<String> missingQueues = Collections.synchronizedSet(new HashSet<String>());	private final long retryDeclarationInterval = 60000;	private long lastRetryDeclaration;


4,调用ConnectionFactoryUtils获取connection

	private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionFactory connectionFactory,			ResourceFactory resourceFactory) {		Assert.notNull(connectionFactory, "ConnectionFactory must not be null");		Assert.notNull(resourceFactory, "ResourceFactory must not be null");		RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager				.getResource(connectionFactory);		if (resourceHolder != null) {			Channel channel = resourceFactory.getChannel(resourceHolder);			if (channel != null) {				return resourceHolder;			}		}		RabbitResourceHolder resourceHolderToUse = resourceHolder;		if (resourceHolderToUse == null) {			resourceHolderToUse = new RabbitResourceHolder();		}		Connection connection = resourceFactory.getConnection(resourceHolderToUse);		Channel channel = null;		try {			/*			 * If we are in a listener container, first see if there's a channel registered			 * for this consumer and the consumer is using the same connection factory.			 */			channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);			if (channel == null && connection == null) {				connection = resourceFactory.createConnection();				resourceHolderToUse.addConnection(connection);			}			if (channel == null) {				channel = resourceFactory.createChannel(connection);			}			resourceHolderToUse.addChannel(channel, connection);			if (resourceHolderToUse != resourceHolder) {				bindResourceToTransaction(resourceHolderToUse, connectionFactory,						resourceFactory.isSynchedLocalTransactionAllowed());			}			return resourceHolderToUse;		} catch (IOException ex) {			RabbitUtils.closeChannel(channel);			RabbitUtils.closeConnection(connection);			throw new AmqpIOException(ex);		}	}


 

5,调用类CachingConnectionFactory的createConnection

	@Override	public final Connection createConnection() throws AmqpException {		synchronized (this.connectionMonitor) {			if (this.cacheMode == CacheMode.CHANNEL) {				if (this.connection == null) {					this.connection = new ChannelCachingConnectionProxy(super.createBareConnection());					// invoke the listener *after* this.connection is assigned					getConnectionListener().onCreate(connection);				}				return this.connection;			}			else if (this.cacheMode == CacheMode.CONNECTION) {				ChannelCachingConnectionProxy connection = null;				while (connection == null && !this.idleConnections.isEmpty()) {					connection = this.idleConnections.poll();					if (connection != null) {						if (!connection.isOpen()) {							if (logger.isDebugEnabled()) {								logger.debug("Removing closed connection '" + connection + "'");							}							connection.notifyCloseIfNecessary();							this.openConnections.remove(connection);							this.openConnectionNonTransactionalChannels.remove(connection);							this.openConnectionTransactionalChannels.remove(connection);							connection = null;						}					}				}				if (connection == null) {					connection = new ChannelCachingConnectionProxy(super.createBareConnection());					getConnectionListener().onCreate(connection);					if (logger.isDebugEnabled()) {						logger.debug("Adding new connection '" + connection + "'");					}					this.openConnections.add(connection);					this.openConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());					this.openConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());				}				else {					if (logger.isDebugEnabled()) {						logger.debug("Obtained connection '" + connection + "' from cache");					}				}				return connection;			}		}		return null;	}

6,调用类ConnectionFactory的newConnection方法

    public Connection newConnection(ExecutorService executor, Address[] addrs)        throws IOException    {        FrameHandlerFactory fhFactory = createFrameHandlerFactory();        ConnectionParams params = params(executor);        if (isAutomaticRecoveryEnabled()) {            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);            conn.init();            return conn;        } else {            IOException lastException = null;            for (Address addr : addrs) {                try {                    FrameHandler handler = fhFactory.create(addr);                    AMQConnection conn = new AMQConnection(params, handler);                    conn.start();                    return conn;                } catch (IOException e) {                    lastException = e;                }            }            throw (lastException != null) ? lastException : new IOException("failed to connect");        }    }

7,调用AMQConnection的start方法

 public void start()        throws IOException    {        initializeConsumerWorkService();        initializeHeartbeatSender();        this._running = true;        // Make sure that the first thing we do is to send the header,        // which should cause any socket errors to show up for us, rather        // than risking them pop out in the MainLoop        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =            new AMQChannel.SimpleBlockingRpcContinuation();        // We enqueue an RPC continuation here without sending an RPC        // request, since the protocol specifies that after sending        // the version negotiation header, the client (connection        // initiator) is to wait for a connection.start method to        // arrive.        _channel0.enqueueRpc(connStartBlocker);        try {            // The following two lines are akin to AMQChannel's            // transmit() method for this pseudo-RPC.            _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);            _frameHandler.sendHeader();        } catch (IOException ioe) {            _frameHandler.close();            throw ioe;        }        // start the main loop going        MainLoop loop = new MainLoop();        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();        mainLoopThread = Environment.newThread(threadFactory, loop, name);        mainLoopThread.start();        // after this point clear-up of MainLoop is triggered by closing the frameHandler.        AMQP.Connection.Start connStart = null;        AMQP.Connection.Tune connTune = null;        try {            connStart =                (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());            Version serverVersion =                new Version(connStart.getVersionMajor(),                            connStart.getVersionMinor());            if (!Version.checkVersion(clientVersion, serverVersion)) {                throw new ProtocolVersionMismatchException(clientVersion,                                                           serverVersion);            }            String[] mechanisms = connStart.getMechanisms().toString().split(" ");            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);            if (sm == null) {                throw new IOException("No compatible authentication mechanism found - " +                        "server offered [" + connStart.getMechanisms() + "]");            }            LongString challenge = null;            LongString response = sm.handleChallenge(null, this.username, this.password);            do {                Method method = (challenge == null)                    ? new AMQP.Connection.StartOk.Builder()                                    .clientProperties(_clientProperties)                                    .mechanism(sm.getName())                                    .response(response)                          .build()                    : new AMQP.Connection.SecureOk.Builder().response(response).build();                try {                    Method serverResponse = _channel0.rpc(method).getMethod();                    if (serverResponse instanceof AMQP.Connection.Tune) {                        connTune = (AMQP.Connection.Tune) serverResponse;                    } else {                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();                        response = sm.handleChallenge(challenge, this.username, this.password);                    }                } catch (ShutdownSignalException e) {                    Method shutdownMethod = e.getReason();                    if (shutdownMethod instanceof AMQP.Connection.Close) {                        AMQP.Connection.Close shutdownClose =  (AMQP.Connection.Close) shutdownMethod;                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {                            throw new AuthenticationFailureException(shutdownClose.getReplyText());                        }                    }                    throw new PossibleAuthenticationFailureException(e);                }            } while (connTune == null);        } catch (ShutdownSignalException sse) {            _frameHandler.close();            throw AMQChannel.wrap(sse);        } catch(IOException ioe) {            _frameHandler.close();            throw ioe;        }        try {            int channelMax =                negotiateChannelMax(this.requestedChannelMax,                                    connTune.getChannelMax());            _channelManager = instantiateChannelManager(channelMax, threadFactory);            int frameMax =                negotiatedMaxValue(this.requestedFrameMax,                                   connTune.getFrameMax());            this._frameMax = frameMax;            int heartbeat =                negotiatedMaxValue(this.requestedHeartbeat,                                   connTune.getHeartbeat());            setHeartbeat(heartbeat);            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()                                .channelMax(channelMax)                                .frameMax(frameMax)                                .heartbeat(heartbeat)                              .build());            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()                                      .virtualHost(_virtualHost)                                    .build());        } catch (IOException ioe) {            _heartbeatSender.shutdown();            _frameHandler.close();            throw ioe;        } catch (ShutdownSignalException sse) {            _heartbeatSender.shutdown();            _frameHandler.close();            throw AMQChannel.wrap(sse);        }        // We can now respond to errors having finished tailoring the connection        this._inConnectionNegotiation = false;        return;    }


8,默认queue以及Channel的值

Queue [name=app_queue, durable=true, autoDelete=false, exclusive=false, arguments=null]
Cached Rabbit Channel: AMQChannel(amqp://ad@192.168.120.12:5672/,1)


9,调用RabbitAdmin完成相关通道和绑定等的创建

10,ConnectionFactoryUtils.getTransactionalResourceHolder完成后,开始建立消费者BlockingQueueConsumer的start方法中

		this.consumer = new InternalConsumer(channel);		this.deliveryTags.clear();		this.activeObjectCounter.add(this);


11,SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法中调用

while (isActive(this.consumer) || continuable) {					try {						// Will come back false when the queue is drained						continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {							if (continuable) {								consecutiveIdles = 0;								if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {									considerAddingAConsumer();									consecutiveMessages = 0;								}							}							else {								consecutiveMessages = 0;								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {									considerStoppingAConsumer(this.consumer);									consecutiveIdles = 0;								}							}						}					}					catch (ListenerExecutionFailedException ex) {						// Continue to process, otherwise re-throw					}					catch (AmqpRejectAndDontRequeueException rejectEx) {						/*						 *  These will normally be wrapped by an LEFE if thrown by the						 *  listener, but we will also honor it if thrown by an						 *  error handler.						 */					}				}


 

12,后续调用业务onmessage见下图

 

 

注意:消费者默认属性

 

acknowledgeMode  prefetchCount=1
AcknowledgeMode AutoAck

spring的RabbitTemplate 接收Message源码导读