Kafka Connect深度解读之配置项处理

原创
05/28 15:26
阅读数 792

Kafka连接器配置

无论是开发源端还是接收端连接器,都需要定义一些外部系统的配置参数,例如身份验证信息等。用户在部署连接器时需要提供这些参数,由于此过程容易发生人为错误,因此对其进行验证非常重要。

Kafka Connect框架允许通过指定配置项的名称、类型、重要性、默认值和其他字段来定义配置项。

下面是一个示例:

ConfigDef config = new ConfigDef();
config.define (
                "hostname",
                ConfigDef.Type.STRING,
                "",
                ConfigDef.Importance.HIGH,
                "Hostname or IP where external system is located"
            );

有多个ConfigDef.define()方法,可接受各种参数的不同组合。

验证器

为了进行验证,可以为任何配置项提供验证器,并将其作为参数传递给ConfigDef.define()方法。可以使用内置的验证器或实现自定义验证器。验证器的ensureValid()方法接收当前为某配置项提供的名称和值,如果值无效,应抛出一个ConfigException异常。

public class CustomValidator implements ConfigDef.Validator { 

public void ensureValid(String name, Object value) {
    		if ( isValueInvalid() ) {
            		throw new ConfigException( "Invalid value: " + value);
    		}
  	}
}

ConfigDef.ValidatorAPI的一个限制是,它只能看到它验证的配置项的值,无法看到其他配置项的值。当配置项的有效性取决于其他配置项的值时,这可能是有问题的。

推荐器

除了验证器,还可以为某个配置项提供ConfigDef.Recommender实现。顾名思义,其目的是为某个配置项推荐有效的值。当某个配置项有一套固定的有效值时,这很有用。当然,有时配置项的有效值可能因其他配置项的当前值而有所不同。

假设正在构建一个源端连接器,该连接器使用SOAP请求从Web服务中获取数据。要允许连接器部署器选择适合此场景的身份验证类型,可以定义名为auth_type的配置项。但是,部署器提供的配置值必须与受支持的身份验证类型之一相匹配。

假设支持三种类型的身份验证:NONE(无身份验证)、BASIC(基于用户名和密码)和SSL_CERT(使用SSL客户端证书)。但是,仅在使用HTTPS协议时,SSL_CERT身份验证才会起作用,这时需要使用另一个名为use_https的布尔类型参数来指定。

推荐器的实现大致如下:

public class AuthTypeValidator implements ConfigDef.Recommender {

      @Override
      public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
      Boolean useHttps = (Boolean) parsedConfig.get("use_https");
      if(useHttps)
            return Arrays.asList("NONE", "BASIC", "SSL_CERT");
      else
            return Arrays.asList("NONE", "BASIC");
      }

      @Override
      public boolean visible(String name, Map<String, Object> parsedConfig) {
            return true;
      }
}

validValues()方法会返回配置项的有效值列表,第一个参数是配置参数名,第二个参数是所有已解析的配置项的映射,其中可以看到确定有效值所需的任何其他配置项。该方法返回空列表具有"允许任何值"的语义,这意味着推荐器不建议任何特定的配置值。这在推荐器仅用于确定配置项的可见性(相关性)的情况下是有意义的,如下所述。

第二个方法visible(),用于确定配置是否与其他配置项相关。在本示例中,auth_type不依赖于任何其他配置项。换句话说,它总是相关的,所以返回true。但是,其他配置项(如用户名和密码)只有在auth_type配置项设置为BASIC的情况下才会相关。

Kafka Connect框架允许通过提供具有任何配置定义的依赖列表来指定配置间依赖。在本例中,auth_type应作为依赖项传递给use_https配置项定义。

验证过程

Kafka Connect框架使用作为配置项定义一部分的验证器来验证配置项,然后才能创建连接器实例。SourceConnectorSinkConnector都扩展了Connector抽象类,后者提供了validate()方法的默认实现:

public Config validate(Map<String, String> connectorConfigs) {...}

该方法接收用户提供的配置映射,配置值以字符串形式传递。它返回Config类的实例,它基本上只是ConfigValue实例列表的包装器,从Config类源代码可以看到:

public class Config {
    private final List configValues;

    public Config(List configValues) {
        this.configValues = configValues;
    }

    public List configValues() {
        return configValues;
    }
}

对于连接器配置定义中定义的每个配置项,configValues列表将包含一个ConfigValue实例,该实例包装从用户输入解析的值,如果用户未提供该值,则将使用默认值。

如果配置项定义包含验证器,它将用于验证值。如果该值无效,则该参数的ConfigValue实例将包含一条错误消息。另外,如果配置项定义中包括了一个推荐器,则相应的ConfigValue实例将包含一个推荐的有效值列表。

当使用Kafka Connect的REST APIPOST /connectors请求创建连接器实例时,将调用该validate()方法。如果提供了无效的配置值,则响应将包含验证器抛出的ConfigException异常的错误消息,如下所示:

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid authentication type: XXXXX\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

除了错误消息外,这种响应还建议使用另一个REST API端点:/connector-plugins/{connector-type}/config/validate,该端点将验证某连接器类型的配置,而不创建该连接器的实例。它返回有关每个配置项的详细信息,包括其定义、当前值(根据输入或默认值解析)、验证错误、推荐值和相关性。

依赖项验证

如推荐器部分所述,在某些情况下,配置项有效性取决于其他配置项的值。

ConfigDef.ValidatorAPI的一个缺点是它不了解可能为配置提供的推荐器。除了验证的值以外,该API还无法访问其他配置项。这意味着验证器只适用于配置值的有效性不依赖于其他配置项的值。当前,处理这种情况的唯一方法是重写连接器的validate()方法并“手动”验证此类配置项,而不是使用验证器。

不过该validate()方法的默认实现有一些繁重的工作,即将值解析为适当的类型,并在未提供值的地方设置默认值,但这方便了开发者。

在重写的方法调用super.validate(),然后仅手动重新验证相关字段,最后在返回结果之前更新验证结果,这似乎是一个最佳解决方案。这样,验证器仍然可以用于独立的字段。开发者只需要手动处理相关字段。此外,推荐器仍用于为依赖配置提供推荐值。

@Override
public Config validate(Map<String, String> connectorConfigs) {
    Config config = super.validate(connectorConfigs);
    ConfigValue authTypeCfg = getConfigValue(config,"auth_type");
    Object authTypeValue = authTypeCfg.value();
    if (!authTypeCfg.recommendedValues().contains(authTypeValue)) {
        authTypeCfg.addErrorMessage("Invalid auth type: " + authTypeValue);
    }
    return config;
}

private ConfigValue getConfigValue(Config config, String configName){
    return config.configValues().stream()
            .filter(value -> value.name().equals(configName) )
            .findFirst().get();
}

总结

连接器配置可以由一个或多个配置项以及一组有限的有效值组成,有时这些值取决于其他配置项的值。Kafka Connect框架提供了一个推荐器API,作为一种插入自定义逻辑的方法,以便开发者可以在计算允许值的同时考虑其他配置值。

验证器不了解推荐器或应验证的配置项,因此,在有效值依赖于其他配置项的情况下,验证器将无法提供所需的功能。对于此类配置项,可以通过覆盖连接器的validate()方法来完成最后的验证。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部