首页 > 代码库 > spring的RabbitTemplate 发送Message源码导读

spring的RabbitTemplate 发送Message源码导读


1,首先业务方法调用RabbitTemplate的convertAndSend方法:(RabbitTemplate继承RabbitAccessor,实现了RabbitOperations和MessageListener接口

@Overridepublic void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {              convertAndSend(exchange, routingKey, object, (CorrelationData) null);}


 

 

 

2,convertAndSend调用自己的重载方法:

	public void convertAndSend(String exchange, String routingKey, final Object message,			final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {		Message messageToSend = convertMessageIfNecessary(message);		messageToSend = messagePostProcessor.postProcessMessage(messageToSend);		send(exchange, routingKey, messageToSend, correlationData);	}


3,convertAndSend调用send方法:

	public void send(final String exchange, final String routingKey,			final Message message, final CorrelationData correlationData)			throws AmqpException {		execute(new ChannelCallback<Object>() {			@Override			public Object doInRabbit(Channel channel) throws Exception {				doSend(channel, exchange, routingKey, message, correlationData);				return null;			}		});	}


4,send调用excute方法进行消息发送:

	@Override	public <T> T execute(final ChannelCallback<T> action) {		if (this.retryTemplate != null) {			try {				return this.retryTemplate.execute(new RetryCallback<T, Exception>() {					@Override					public T doWithRetry(RetryContext context) throws Exception {						return RabbitTemplate.this.doExecute(action);					}				});			}			catch (Exception e) {				if (e instanceof RuntimeException) {					throw (RuntimeException) e;				}				throw RabbitExceptionTranslator.convertRabbitAccessException(e);			}		}		else {			return this.doExecute(action);		}	}


5,execute方法调用doExecute

	private <T> T doExecute(ChannelCallback<T> action) {		Assert.notNull(action, "Callback object must not be null");		RabbitResourceHolder resourceHolder = getTransactionalResourceHolder();		Channel channel = resourceHolder.getChannel();		if (this.confirmCallback != null || this.returnCallback != null) {			addListener(channel);		}		try {			if (logger.isDebugEnabled()) {				logger.debug("Executing callback on RabbitMQ Channel: " + channel);			}			return action.doInRabbit(channel);		}		catch (Exception ex) {			if (isChannelLocallyTransacted(channel)) {				resourceHolder.rollbackAll();			}			throw convertRabbitAccessException(ex);		}		finally {			ConnectionFactoryUtils.releaseResources(resourceHolder);		}	}


6,doExcute调用execute方法

	public void send(final String exchange, final String routingKey,			final Message message, final CorrelationData correlationData)			throws AmqpException {		execute(new ChannelCallback<Object>() {			@Override			public Object doInRabbit(Channel channel) throws Exception {				doSend(channel, exchange, routingKey, message, correlationData);				return null;			}		});	}


7,execute调用doSend方法

	protected void doSend(Channel channel, String exchange, String routingKey, Message message,			CorrelationData correlationData) throws Exception {		if (logger.isDebugEnabled()) {			logger.debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]");		}		if (exchange == null) {			// try to send to configured exchange			exchange = this.exchange;		}		if (routingKey == null) {			// try to send to configured routing key			routingKey = this.routingKey;		}		if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {			PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;			publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),					new PendingConfirm(correlationData, System.currentTimeMillis()));		}		boolean mandatory = this.returnCallback != null && this.mandatory;		MessageProperties messageProperties = message.getMessageProperties();		if (mandatory) {			messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION, this.uuid);		}		BasicProperties convertedMessageProperties = this.messagePropertiesConverter				.fromMessageProperties(messageProperties, encoding);		channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());		// Check if commit needed		if (isChannelLocallyTransacted(channel)) {			// Transacted channel created by this template -> commit.			RabbitUtils.commitIfNecessary(channel);		}	}


8,调用CachingConnectionFactory的invoke方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {			String methodName = method.getName();			if (methodName.equals("txSelect") && !this.transactional) {				throw new UnsupportedOperationException("Cannot start transaction on non-transactional channel");			}			if (methodName.equals("equals")) {				// Only consider equal when proxies are identical.				return (proxy == args[0]);			}			else if (methodName.equals("hashCode")) {				// Use hashCode of Channel proxy.				return System.identityHashCode(proxy);			}			else if (methodName.equals("toString")) {				return "Cached Rabbit Channel: " + this.target;			}			else if (methodName.equals("close")) {				// Handle close method: don't pass the call on.				if (active) {					synchronized (this.channelList) {						if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) {							logicalClose((ChannelProxy) proxy);							// Remain open in the channel list.							return null;						}					}				}				// If we get here, we're supposed to shut down.				physicalClose();				return null;			}			else if (methodName.equals("getTargetChannel")) {				// Handle getTargetChannel method: return underlying Channel.				return this.target;			}			else if (methodName.equals("isOpen")) {				// Handle isOpen method: we are closed if the target is closed				return this.target != null && this.target.isOpen();			}			try {				if (this.target == null || !this.target.isOpen()) {					this.target = null;				}				synchronized (targetMonitor) {					if (this.target == null) {						this.target = createBareChannel(theConnection, transactional);					}					return method.invoke(this.target, args);				}			}			catch (InvocationTargetException ex) {				if (this.target == null || !this.target.isOpen()) {					// Basic re-connection logic...					this.target = null;					if (logger.isDebugEnabled()) {						logger.debug("Detected closed channel on exception.  Re-initializing: " + target);					}					synchronized (targetMonitor) {						if (this.target == null) {							this.target = createBareChannel(theConnection, transactional);						}					}				}				throw ex.getTargetException();			}		}


9,调用rabbitmq的ChannelN类的basicPublish方法(继承AMQChannel,实现接口com.rabbitmq.client.Channel)

 public void basicPublish(String exchange, String routingKey,                             boolean mandatory, boolean immediate,                             BasicProperties props, byte[] body)        throws IOException    {        if (nextPublishSeqNo > 0) {            unconfirmedSet.add(getNextPublishSeqNo());            nextPublishSeqNo++;        }        BasicProperties useProps = props;        if (props == null) {            useProps = MessageProperties.MINIMAL_BASIC;        }        transmit(new AMQCommand(new Basic.Publish.Builder()                                    .exchange(exchange)                                    .routingKey(routingKey)                                    .mandatory(mandatory)                                    .immediate(immediate)                                .build(),                                useProps, body));    }


10,调用AMQChannel的transmit方法

    public void transmit(AMQCommand c) throws IOException {        synchronized (_channelMutex) {            ensureIsOpen();            quiescingTransmit(c);        }    }

11,调用quiescingTransmit方法

    public void quiescingTransmit(AMQCommand c) throws IOException {        synchronized (_channelMutex) {            if (c.getMethod().hasContent()) {                while (_blockContent) {                    try {                        _channelMutex.wait();                    } catch (InterruptedException e) {}                    // This is to catch a situation when the thread wakes up during                    // shutdown. Currently, no command that has content is allowed                    // to send anything in a closing state.                    ensureIsOpen();                }            }            c.transmit(this);        }    }


12,调用AMQCommand的transmit方法

 public void transmit(AMQChannel channel) throws IOException {        int channelNumber = channel.getChannelNumber();        AMQConnection connection = channel.getConnection();        synchronized (assembler) {            Method m = this.assembler.getMethod();            connection.writeFrame(m.toFrame(channelNumber));            if (m.hasContent()) {                byte[] body = this.assembler.getContentBody();                connection.writeFrame(this.assembler.getContentHeader()                        .toFrame(channelNumber, body.length));                int frameMax = connection.getFrameMax();                int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax                        - EMPTY_FRAME_SIZE;                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {                    int remaining = body.length - offset;                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining                            : bodyPayloadMax;                    Frame frame = Frame.fromBodyFragment(channelNumber, body,                            offset, fragmentLength);                    connection.writeFrame(frame);                }            }        }        connection.flush();    }


13,调用AMQConnection的writeFrame方法送数据

 public void writeFrame(Frame f) throws IOException {        _frameHandler.writeFrame(f);        _heartbeatSender.signalActivity();    }


14,调用SocketFrameHandler的方法writeFrame的方法,完成发送

    /**     * Public API - writes this Frame to the given DataOutputStream     */    public void writeTo(DataOutputStream os) throws IOException {        os.writeByte(type);        os.writeShort(channel);        if (accumulator != null) {            os.writeInt(accumulator.size());            accumulator.writeTo(os);        } else {            os.writeInt(payload.length);            os.write(payload);        }        os.write(AMQP.FRAME_END);    }

 

附图:

 

 

 

spring的RabbitTemplate 发送Message源码导读