异步转同步
博客专区 > xpbug 的博客 > 博客详情
异步转同步
xpbug 发表于2年前
异步转同步
  • 发表于 2年前
  • 阅读 136
  • 收藏 3
  • 点赞 1
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: 后台服务使用Event Sourcing的架构越来越多,为了提高CPU利用率,提高后台服务的性能,减少响应时间,后台服务大部分使用异步消息来通讯。然而异步消息并不适合客户端的调用的使用方式。客户端调用大多还是使用同步的方式。如何将异步消息转化为同步消息?本文将给出设计模式。

下面是一个CS架构的部署, 服务端是SEDA类型的架构,消息传输完全是异步。而客户端在使用SDK向服务端发起请求。为了方便客户的使用,我们希望能提供同步方式的SDK。所以我们的SDK需要将异步消息转换成同步消息。如何做到这一点呢?

因为服务器需要保持CPU的充分利用,所以异步转同步的工作不应该由Server完成,而应该有SDK来完成。简单的讲,异步转同步的流程应该分成这么几个步骤:

  1. SDK发出请求

  2. SDK等待返回

  3. 服务端持续返回消息

  4. 返回的消息和请求匹配上

  5. SDK返回响应

把以上的步骤封装在SDK里面,上面的示意图可以演变为下面的图

在实现中,requestNo和Waiter是实现的关键。RequestNo使发出的请求和返回的响应能够匹配上,而Waiter则可以使SDK等待,当拿到响应以后,唤醒SDK,返回响应。

略微解释一下上图的流程:

  1. Client调用Sender准备发请求。Sender先生成一个requestNo,将requestNo贴到请求上。然后生成一个waiter,将requestNo和waiter放入Map中。Map中存有所有正在等待消息返回的waiter。然后Sender阻塞在waiter的standby(timeout)方法上。

  2. Sender将消息塞入发送队列。

  3. Connection从发送队列取出消息。

  4. Connection发消息给Server。

  5. Server持续异步返回各类消息。

  6. Connection将返回的消息放入接收队列。

  7. Receiver从接收队里中取出消息。

  8. Receiver从消息中获取requestNo,然后从Map中获取相应的waiter。并调用waiter的accept( )方法。

  9. Waiter接受了消息,并确认成功接收到所有的响应消息。唤醒Sender。

  10. Sender返回消息给client。

public class Sender {
    public IObject send(IObject request, long timeout) {
        String requestNo = getRequestNo();
        waiter = new Waiter(client, requestNo);
        client.waiters.put(requestNo, waiter);
        request.set("requestNo", requestNo);
        connection.send(request);
        waiter.standby(timeout);  // Block
        int state = waiter.getState(); // 0=waiting first, 1=waiting next, 2=timeout, 3=complete
        if (state == 3) {
            return waiter.getResult();
        } else {
            // create new response for failure.
        }
    }
}

public class Receiver {
    public void receive(IObject msg) {
        String requestNo = node.get("requestNo");
        Waiter waiter = client.waiters.get(requestNo);
        if (waiter != null) {
            if (waiter.accept(msg)) return;
        }
        // 如果消息不被接受,则发给MessageHandler来处理。        
        if (client.handler != null) {
            client.handler.handleMessage(msg);
        }
    }
}

public class Waiter {
    // 0=waiting first; 1=waiting second; 2=timeout; 3=complete 
    private int state=0;
    private List<IObject> messages;
    
    public synchronized void standby(long timeout) {
        try {
            this.wait(timeout);
        } catch (InterruptedException e) {
        }
        if (state < 2) {
            state = 2;
            client.waiters.remove(requestNo);
        }
    }
 
    public synchronized boolean accept(IObject msg) {
        if (state >= 2) {
            // waiter is closed
            return false;
        } 
         
        try {
            if (state == 0) {
                // 收到第一个返回的消息
                messages.add(msg)
                // 查看是否还有后续消息
                // 设置state的状态
                // 如果需要后续消息, return true
            }
            if (state == 1) {
                // 如果不是期待的消息类型, return false
                // 收到了后续消息
                messages.add(msg)
                // 设置状态
                // 如果需要后续消息, return true
            }
            state = 3;
            client.waiters.remove(requestNo);
            // 唤醒SDK
            this.notifyAll();
            return true;
        } catch (Exception e) {
            // 发生解析错误, 创建错误消息
            messages.add(err)
            state = 3;
            client.waiters.remove(requestNo);
            this.notifyAll();
            return true;
        } 
    }
    
    public synchronized IObject getResult() {
        // 将message列表中的消息合并成一个result
        return result;
    }
}


共有 人打赏支持
粉丝 298
博文 97
码字总数 125336
×
xpbug
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: