文档章节

聊聊rocketmq的ProducerImpl

go4it
 go4it
发布于 2018/07/28 21:25
字数 667
阅读 53
收藏 0

本文主要研究一下rocketmq的ProducerImpl

ProducerImpl

io/openmessaging/rocketmq/producer/ProducerImpl.java

public class ProducerImpl extends AbstractOMSProducer implements Producer {

    public ProducerImpl(final KeyValue properties) {
        super(properties);
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public SendResult send(final Message message) {
        return send(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public SendResult send(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return send(message, timeout);
    }

    private SendResult send(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                log.error(String.format("Send message to RocketMQ failed, %s", message));
                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
            }
            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
            return OMSUtil.sendResultConvert(rmqResult);
        } catch (Exception e) {
            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message) {
        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return sendAsync(message, timeout);
    }

    private Promise<SendResult> sendAsync(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        final Promise<SendResult> promise = new DefaultPromise<>();
        try {
            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                @Override
                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
                    promise.set(OMSUtil.sendResultConvert(rmqResult));
                }

                @Override
                public void onException(final Throwable e) {
                    promise.setFailure(e);
                }
            }, timeout);
        } catch (Exception e) {
            promise.setFailure(e);
        }
        return promise;
    }

    @Override
    public void sendOneway(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            this.rocketmqProducer.sendOneway(rmqMessage);
        } catch (Exception ignore) { //Ignore the oneway exception.
        }
    }

    @Override
    public void sendOneway(final Message message, final KeyValue properties) {
        sendOneway(message);
    }
}
  • 发送消息的方法主要是代理给rocketmqProducer
  • 另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message
  • 对于异步采用的是DefaultPromise,其callback为SendCallback

OMSUtil.msgConvert

io/openmessaging/rocketmq/utils/OMSUtil.java

    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
        rmqMessage.setBody(omsMessage.getBody());

        KeyValue headers = omsMessage.headers();
        KeyValue properties = omsMessage.properties();

        //All destinations in RocketMQ use Topic
        if (headers.containsKey(MessageHeader.TOPIC)) {
            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
        } else {
            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
        }

        for (String key : properties.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
        }

        //Headers has a high priority
        for (String key : headers.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
        }

        return rmqMessage;
    }
  • 这里主要是转换header及topic信息

SendCallback

org/apache/rocketmq/client/producer/SendCallback.java

public interface SendCallback {
    void onSuccess(final SendResult sendResult);

    void onException(final Throwable e);
}
  • 对于成功,将SendResult传递过来,对于异常则传递Throwable

DefaultPromise

io/openmessaging/rocketmq/promise/DefaultPromise.java

public class DefaultPromise<V> implements Promise<V> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
    private final Object lock = new Object();
    private volatile FutureState state = FutureState.DOING;
    private V result = null;
    private long timeout;
    private long createTime;
    private Throwable exception = null;
    private List<PromiseListener<V>> promiseListenerList;

    public DefaultPromise() {
        createTime = System.currentTimeMillis();
        promiseListenerList = new ArrayList<>();
        timeout = 5000;
    }

    //......

    @Override
    public boolean set(final V value) {
        if (value == null)
            return false;
        this.result = value;
        return done();
    }

    @Override
    public boolean setFailure(final Throwable cause) {
        if (cause == null)
            return false;
        this.exception = cause;
        return done();
    }

    private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }

            state = FutureState.DONE;
            lock.notifyAll();
        }

        notifyListeners();
        return true;
    }

    private void notifyListeners() {
        if (promiseListenerList != null) {
            for (PromiseListener<V> listener : promiseListenerList) {
                notifyListener(listener);
            }
        }
    }

    private void notifyListener(final PromiseListener<V> listener) {
        try {
            if (exception != null)
                listener.operationFailed(this);
            else
                listener.operationCompleted(this);
        } catch (Throwable t) {
            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
        }
    }

    //......
}
  • set或者setFailure方法都会调用done方法
  • done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed

小结

  • ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口
  • 异步采用自定义的SendCallback回调和DefaultPromise

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1247
码字总数 1165263
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
426
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
35
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
221
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
19
0
聊聊rocketmq的PullConsumerImpl

序 本文主要研究一下rocketmq的PullConsumerImpl PullConsumerImpl io/openmessaging/rocketmq/consumer/PullConsumerImpl.java 这里poll方法从LocalMessageCache里头拉取消息 LocalMessage......

go4it
2018/07/31
38
0

没有更多内容

加载失败,请刷新页面

加载更多

00-Java 面试准备

面试之前 面试前准备简历需要注意的几个方面: 写简历、改简历,这个一定要干的。简历有两个作用,一个是吸引别人,能让别人邀请你去面试,这是前提;另一个是引导面试的人,让面试的人问你所...

源程序
今天
54
0
OSChina 周二乱弹 —— 大王(@罗马的王)颜值制霸Osc社区

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @巴拉迪维 :Lunik的单曲《Seeing You Soar》 I hope you’re smiling,When seeing me soar. #今日歌曲推荐# 《Seeing You Soar》- Lunik 手...

小小编辑
今天
105
0
wordcount代码

1.写出map类 public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override protected void map(LongWritable key,Text value,Context context)throws IOExcepti......

七宝1
今天
59
0
Spring Batch 小任务(Tasklet)步骤

Chunk-Oriented Processing不是处理 step 的唯一方法。 考虑下面的一个场景,如果你仅仅需要调用一个存储过程,你可以在 ItemReader 中实现这个调用,然后在存储过程完成调用后返回 null。这...

honeymoose
今天
67
0
Linux日志分析

1. Linux日志文件的类型 2. 系统服务日志 2.1 syslogd的简介 2.2 syslogd的配置和使用 2.3 日志的安全性设置 2.4 远程日志记录服务 3. 日志的轮替 3.1 logrotate简介 3.2 logrotate的配置 3....

JiaMing
昨天
67
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部