文档章节

kafka-请求头部解析

T
 Thinking--
发布于 2017/07/25 23:33
字数 1639
阅读 24
收藏 0
点赞 0
评论 0

上篇介绍了NetworkReceive

当接收到NetworkReceive, Processor会构造了Request实例,发送给RequestChannel

  private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 创建Request实例
     val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
    }
  }

Request

Request表示请求,它有两个主要的属性。

header是通用请求的头部

bodyAndSize是请求的数据部分,它根据不同类型的请求,返回不同的实例

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
    val requestId = buffer.getShort()
    // 这里只是为了支持v0版本的shutdown请求
    val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
      ControlledShutdownRequest.readFrom(buffer)
    else
      null

    val header: RequestHeader =
      if (requestObj == null) {
        buffer.rewind
        // 使用RequestHeader的类方法解析
        try RequestHeader.parse(buffer)
        catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
        }
      } else
        null
    val bodyAndSize: RequestAndSize =
      if (requestObj == null)
        try {
          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
            new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
          }
          else
            // 根据apiKey,apiVersion和buffer,实例化RequestAndSize
            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
        } catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
        }
      else
        null

    buffer = null

Type

Type类是基本的数据格式,它内置定义了常用的数据结构,方便数据的读取

public abstract class Type {
    // 进数据Object写入到ByteBuffer
    public abstract void write(ByteBuffer buffer, Object o);
    // 从ByteBuffer读取Object数据
    public abstract Object read(ByteBuffer buffer);
    // 验证Object是否合理
    public abstract Object validate(Object o);
    // 返回Object数据的大小
    public abstract int sizeOf(Object o);
    // 是否数据为null
    public boolean isNullable() {
        return false;
    }

自定义的Type, 有INT8,INT16,...STRING,BYTES,VARINT...。

下面以INT16为例。因为INT16占2个字节,刚好是Short类型

public static final Type INT16 = new Type() {
        @Override
        public void write(ByteBuffer buffer, Object o) {
            // 调用ByteBuffer的putShort方法
            buffer.putShort((Short) o);
        }

        @Override
        public Object read(ByteBuffer buffer) {
            // 调用ByteBuffer的getShort方法
            return buffer.getShort();
        }

        @Override
        public int sizeOf(Object o) {
            // 占两个字节
            return 2;
        }

        @Override
        public String toString() {
            return "INT16";
        }

        @Override
        public Short validate(Object item) {
            // 检验Object是否为Short类型
            if (item instanceof Short)
                return (Short) item;
            else
                throw new SchemaException(item + " is not a Short.");
        }
    };

Field

Field只是一些属性的集合类

public class Field {

    public static final Object NO_DEFAULT = new Object();
    // 位置,表明在Schema的位置
    final int index;
    // 名称
    public final String name;
    // 类型
    public final Type type;
    // 默认值
    public final Object defaultValue;
    // 解释文档
    public final String doc;
    final Schema schema;

    public Field(int index, String name, Type type, String doc, Object defaultValue) {
        this(index, name, type, doc, defaultValue, null);
    }

    public Field(String name, Type type, String doc, Object defaultValue) {
        this(-1, name, type, doc, defaultValue);
    }

    public Field(String name, Type type, String doc) {
        this(name, type, doc, NO_DEFAULT);
    }

Schema

Schema是Field的集合,Field在里面是有顺序的。它支持数据读取,返回Struct类型。

public class Schema extends Type {
    // fields列表,按照顺序排序
    private final Field[] fields;
    // 哈希表,用来通过string查看field
    private final Map<String, Field> fieldsByName;

    public Schema(Field... fs) {
        // 
        this.fields = new Field[fs.length];
        this.fieldsByName = new HashMap<>();
        for (int i = 0; i < this.fields.length; i++) {
            Field field = fs[i];
            if (fieldsByName.containsKey(field.name))
                throw new SchemaException("Schema contains a duplicate field: " + field.name);
            // 实例Field,注意第一个参数i,表示位置
            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
            this.fieldsByName.put(fs[i].name, this.fields[i]);
        }
    }
    
    
    public Struct read(ByteBuffer buffer) {
        Object[] objects = new Object[fields.length];
        // 按照fields的顺序,依次读取值,并且保存到objects列表
        for (int i = 0; i < fields.length; i++) {
            try {
                objects[i] = fields[i].type.read(buffer);
            } catch (Exception e) {
                throw new SchemaException("Error reading field '" + fields[i].name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        // Struct提供了方便的接口访问数据
        return new Struct(this, objects);
    }
    
    public void write(ByteBuffer buffer, Object o) {
        Struct r = (Struct) o;
        // 按照fields的顺序依次遍历
        for (Field field : fields) {
            try {
                // 调用Struct的get方法,获取值
                Object value = field.type().validate(r.get(field));
                // 写入到buffer
                field.type.write(buffer, value);
            } catch (Exception e) {
                throw new SchemaException("Error writing field '" + field.name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
    }

内置的Schema都在Protocol类里定义

public class Protocol {

    public static final Schema REQUEST_HEADER = new Schema(
                                 new Field("api_key", INT16, "The id of the request type."),
                                 new Field("api_version", INT16, "The version of the API."),
                                 new Field("correlation_id", INT32,
                                    "A user-supplied integer value that will be passed back with the response"),
                                 new Field("client_id",  NULLABLE_STRING,
                                    "A user specified identifier for the client making the request.", ""));
    ......

RequestHeader

RequestHeader表明通用的请求头部

RequestHeader数据结构

|api_key | api_version | correlation_id | client_id |

public class RequestHeader extends AbstractRequestResponse {

    private final short apiKey;
    private final short apiVersion;
    private final String clientId;
    private final int correlationId;

    public RequestHeader(Struct struct) {
        apiKey = struct.getShort(API_KEY_FIELD);
        apiVersion = struct.getShort(API_VERSION_FIELD);
        clientId = struct.getString(CLIENT_ID_FIELD);
        correlationId = struct.getInt(CORRELATION_ID_FIELD);
    }

     public static RequestHeader parse(ByteBuffer buffer) {
        // 使用内置的Schema,调用read方法读取
        return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
    }

AbstractRequest

在Request类中,调用AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer),初始化AbstractRequest。最后request和struct实例化RequestAndSize

public abstract class AbstractRequest extends AbstractRequestResponse {
    public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
        // 根据header.apiKey,取出ApiKeys
        ApiKeys apiKey = ApiKeys.forId(requestId);
        Struct struct = apiKey.parseRequest(version, buffer);
        AbstractRequest request;
        // 根据apiKey实例化不同的RequestAndSize
        switch (apiKey) {
            case PRODUCE:
                request = new ProduceRequest(struct, version);
                break;
            case FETCH:
                request = new FetchRequest(struct, version);
                break;
            case LIST_OFFSETS:
                request = new ListOffsetRequest(struct, version);
                break;
            case METADATA:
                request = new MetadataRequest(struct, version);
                break;
            case OFFSET_COMMIT:
                request = new OffsetCommitRequest(struct, version);
                break;
            case OFFSET_FETCH:
                request = new OffsetFetchRequest(struct, version);
                break;
        ....
        }
        return new RequestAndSize(request, struct.sizeOf());

ApiKeys

ApiKeys是一个枚举类型,它定义了header的apikey。通过这个apikey,可以解析出这个请求是什么类型的

public enum ApiKeys {
    PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr", true),
    ....   
    // id 到ApiKeys的value的列表
    private static final ApiKeys[] ID_TO_TYPE;
    private static final int MIN_API_KEY = 0;
    public static final int MAX_API_KEY;
    
    static {
        int maxKey = -1;
        // 更新maxKey
        for (ApiKeys key : ApiKeys.values())
            maxKey = Math.max(maxKey, key.id);
        ApiKeys[] idToType = new ApiKeys[maxKey + 1];
        // 更新idToType
        for (ApiKeys key : ApiKeys.values())
            idToType[key.id] = key;
        ID_TO_TYPE = idToType;
        MAX_API_KEY = maxKey;
    }
    .......
    
    // 根据获取id从ID_TO_TYPE获取相应的type
    public static ApiKeys forId(int id) {
        if (!hasId(id))
            throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " +
                    "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
        return ID_TO_TYPE[id];
    }
    // 解析请求
    public Struct parseRequest(short version, ByteBuffer buffer) {
        return requestSchema(version).read(buffer);
    }
    // 返回对应version的Request Schema
    public Schema requestSchema(short version) {
        return schemaFor(Protocol.REQUESTS, version);
    }
    // 针对schemas列表,返回对应version的Schema
    private Schema schemaFor(Schema[][] schemas, short version) {
        // 检查version的值
        if (id > schemas.length)
            throw new IllegalArgumentException("No schema available for API key " + this);
        if (version < 0 || version > latestVersion())
            throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
        // 返回id对应的Schema列表,里面包含了不同的version
        Schema[] versions = schemas[id];
        if (versions[version] == null)
            throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version);
        // 返回version对应的
        return versions[version];
    }
    

Protocol

Protocol类,里面定义了很多apikey和schema的关系。

// REQUESTS是Schema的二维数据。一维坐标是key_id,二维坐标是version
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];

// produce request不同版本的集合
public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};

static {
        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST;
        .......
}

概括

Schema由多个Field组成,Field主要包含了Type,name等属性。Schema负责从buffer解析数据,返回Struct结果。

Protocol包含了许多内置的Schema。

RequestHeader表明请求头部,就是使用了内置的Protocol.REQUEST_HEADER这个Schema解析。请求头部包含apiKey,version等属性。

ApiKeys包含了apikey的集合。它可以根据apikey和version找到对应的Schema。

AbstractRequest提供了根据apikey和version,解析和返回对应的Request的实例。

© 著作权归作者所有

共有 人打赏支持
T
粉丝 5
博文 45
码字总数 44403
作品 0
武汉
KAFKA本机部署成功远端无法访问解决

方案1.修改机器Hostname为本机IP,设置kafka的 server.properties 参数host.name为空。 方案2.kafka的server.properties添加参数advertised.host.name=本机IP 这样远端直接访问IP即可连接kaf...

屌丝Lee ⋅ 2016/06/23 ⋅ 2

Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。...

wenqi_arthur ⋅ 2015/09/11 ⋅ 0

Kafka设计解析(八)- Kafka事务机制与Exactly Once语义实现原理

写在前面的话 本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本。 为什么要提供事务机制 Kafka事务机制的实现主要是为了支持 即正好一次语义 操作的原子性 有状态操作的可恢复性...

xiaomin0322 ⋅ 05/22 ⋅ 0

servlet解析演进(2-1)

上文说了简单的servlet解析过程。实际上,tomcat在解析servlet的 时候是吧连接器定位为满足以下三个条件: 1、必须事先接口org.apache.catalina.Connector 2. 必须创建请求对象,该请求对象的...

hyssop ⋅ 2016/01/24 ⋅ 0

servlet解析演进(2-2)

3 处理器开始处理请求 private void process(Socket socket) { boolean ok = true; boolean finishResponse = true; SocketInputStream input = null; OutputStream output = null; // Const......

hyssop ⋅ 2016/01/25 ⋅ 0

[spdy]初识——比http好在哪里

spdy是google自创的web传输协议,为的是改善http的性能,google说比较spdy和http,前者的性能要比后者快64%。是否属实我不知道,反正快64%这个数字让我对spdy产生了兴趣。 既然spdy好,那它好...

亭子happy ⋅ 2012/09/18 ⋅ 0

一文读懂 HTTP/2 特性

HTTP/2 是 HTTP 协议自 1999 年 HTTP 1.1 发布后的首个更新,主要基于 SPDY 协议。由互联网工程任务组(IETF)的 Hypertext Transfer Protocol Bis(httpbis)工作小组进行开发。该组织于201...

wantingyun ⋅ 2017/06/21 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费...

Adel ⋅ 2016/01/29 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费...

pior ⋅ 2015/12/23 ⋅ 0

Scala下Play框架学习笔记(Body parsers)

什么是Body Parsers 一个HTTP请求是一个头部后面紧随着一个body,头部很小,可以在内存中缓存,因此Play的模型中使用了这个类。Body有时候也可能很长,以致于不能缓存,反而作为一种流而被建...

金明略 ⋅ 2016/12/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JavaScript零基础入门——(八)JavaScript的数组

JavaScript零基础入门——(八)JavaScript的数组 欢迎大家回到我们的JavaScript零基础入门,上一节课我们讲了有关JavaScript正则表达式的相关知识点,便于大家更好的对字符串进行处理。这一...

JandenMa ⋅ 今天 ⋅ 0

sbt网络问题解决方案

转自:http://dblab.xmu.edu.cn/blog/maven-network-problem/ cd ~/.sbt/launchers/0.13.9unzip -q ./sbt-launch.jar 修改 vi sbt/sbt.boot.properties 增加一个oschina库地址: [reposit......

狐狸老侠 ⋅ 今天 ⋅ 0

大数据,必须掌握的10项顶级安全技术

我们看到越来越多的数据泄漏事故、勒索软件和其他类型的网络攻击,这使得安全成为一个热门话题。 去年,企业IT面临的威胁仍然处于非常高的水平,每天都会看到媒体报道大量数据泄漏事故和攻击...

p柯西 ⋅ 今天 ⋅ 0

Linux下安装配置Hadoop2.7.6

前提 安装jdk 下载 wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz 解压 配置 vim /etc/profile # 配置java环境变量 export JAVA_HOME=/opt/jdk1......

晨猫 ⋅ 今天 ⋅ 0

crontab工具介绍

crontab crontab 是一个用于设置周期性被执行的任务工具。 周期性执行的任务列表称为Cron Table crontab(选项)(参数) -e:编辑该用户的计时器设置; -l:列出该用户的计时器设置; -r:删除该...

Linux学习笔记 ⋅ 今天 ⋅ 0

深入Java多线程——Java内存模型深入(2)

5. final域的内存语义 5.1 final域的重排序规则 1.对于final域,编译器和处理器要遵守两个重排序规则: (1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用...

江左煤郎 ⋅ 今天 ⋅ 0

面试-正向代理和反向代理

面试-正向代理和反向代理 Nginx 是一个高性能的反向代理服务器,但同时也支持正向代理方式的配置。

秋日芒草 ⋅ 今天 ⋅ 0

Spring 依赖注入(DI)

1、Setter方法注入: 通过设置方法注入依赖。这种方法既简单又常用。 类中定义set()方法: public class HelloWorldOutput{ HelloWorld helloWorld; public void setHelloWorld...

霍淇滨 ⋅ 昨天 ⋅ 0

马氏距离与欧氏距离

马氏距离 马氏距离也可以定义为两个服从同一分布并且其协方差矩阵为Σ的随机变量之间的差异程度。 如果协方差矩阵为单位矩阵,那么马氏距离就简化为欧氏距离,如果协方差矩阵为对角阵,则其也...

漫步当下 ⋅ 昨天 ⋅ 0

聊聊spring cloud的RequestRateLimiterGatewayFilter

序 本文主要研究一下spring cloud的RequestRateLimiterGatewayFilter GatewayAutoConfiguration @Configuration@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMi......

go4it ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部