Kafka connect介绍、部署及开发

原创
2017/10/25 10:50
阅读数 1.5W

Kafak connect 简介

Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。

Kafaka connect 概念

Kafaka connect 有几个重要的概念:

  • Source:负责导入数据到kafka
  • Sink:负责从kafka导出数据
  • Connectors :通过管理任务来协调数据流的高级抽象
  • Tasks:数据写入kafk和从kafka中读出的具体实现
  • Workers:运行connectors和tasks的进程
  • Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据
  • Transforms:一种轻量级数据调整的工具

Kafka connect 工作模式

Kafka connect 有两种工作模式

  • standalone:在standalone模式中,所有的worker都在一个独立的进程中完成
  • distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。
    在分布式模式下通过rest api管理connector
  • GET /connectors – 返回所有正在运行的connector名。
  • POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
  • GET /connectors/{name} – 获取指定connetor的信息。
  • GET /connectors/{name}/config – 获取指定connector的配置信息。
  • PUT /connectors/{name}/config – 更新指定connector的配置信息。
  • GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  • GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
  • GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
  • PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
  • PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
  • POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
  • POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
  • DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

Kafka connect 快速启动

Kafka connect 只是Apache Kafka提出的一种kafka数据流处理的框架,目前有很多开源的、优秀的实现,比较著名的是Confluent平台,支持很多Kafka connect的实现,例如Elasticsearch(Sink)、HDFS(Sink)、JDBC等等。下面主要介绍Confluent平台中kafka-connect-elasticsearch的使用。首先在Confluent官网上下载confluent-3.3.0。想运行kafka-connect-elasticsearch的前提是提供kafka服务以及ES服务。

standalone模式

1、首先更改配置connect-standalone.properties

#broker列表
bootstrap.servers=10.120.241.1:9200
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#是否需要schemas进行转码,我们使用的是json数据,所以设置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

2、更改配置quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=estest1012
key.ignore=true
schema.ignore=true
connection.url=http://10.120.241.194:9200
type.name=kafka-connect

注:需要配置schema.ignore=true,如果不配置会抛异常(由于使用的json数据)

3、启动kafka-connect-elasticsearch

./bin/connect-standalone ./etc/kafka/connect-standalone.properties  ./etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

4、测试效果
通过kafka-console-producer.sh发一条json格式的消息,然后查询es索引 输入图片说明

distribute模式

1、修改配置connect-distributed.properties

# broker列表
bootstrap.servers=10.120.241.1:9200

# 同一集群中group.id需要配置一致,且不能和别的消费者同名
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 使用json数据同样配置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
····

2、 手动创建集群模式所必须的kafka的几个topic

# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact

# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact

# status.storage.topic=connect-status
$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact


  • config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
  • offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
  • status.storage.topic:用于存储状态;这个topic 可以有多个partitions和副本

3、 启动worker

./bin/connect-distributed ./etc/kafka/connect-distributed.properties   

4、使用restful启动connect

curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d   
    '{ "name":"elasticsearch-sink",  
       "config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",  
                "tasks.max":10,  
                "topics":"estest1012",  
                "key.ignore":true,  
                "schema.ignore":true,  
                "connection.url":"http://10.120.241.194:9200",  
                "type.name":"kafka-connect"}  
    }'

5、 查看所有connnect,以及状态

输入图片说明 输入图片说明
6、 配置日志
默认情况下日志只在控制台输出,如需要保存文件需要修改配置connect-log4j.properties,例如下:

log4j.rootLogger=INFO, stdout, stdfile

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.appender.stdfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stdfile.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stdfile.File=${kafka.logs.dir}/stdout.log
log4j.appender.stdfile.layout=org.apache.log4j.PatternLayout
log4j.appender.stdfile.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

Kafak connect 开发

本文将开发一个kafka-connect示例,主要的功能是将kafka中的消息持久化到文件中。

  • 新建maven工程,并添加connect-api依赖
<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>connect-api</artifactId>
			<version>${kafka.version}</version>
</dependency>
  • 主要是实现两个类
public class NeteaseFileSinkConnector extends SinkConnector{

	/**
	 * 配置项
	 */
	public static final String FILE_CONFIG = "file";
	/**
	 * 配置校验
	 */
	private static final ConfigDef CONFIG_DEF = new ConfigDef()
	        .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used");

	private String filename;
	
	@Override
	public ConfigDef config() {
		// TODO Auto-generated method stub
		return CONFIG_DEF;
	}

	/**
	 * 从配置文件中初始化配置
	 */
	@Override
	public void start(Map<String, String> props) {
		// TODO Auto-generated method stub
		filename = props.get(FILE_CONFIG);
	}

	@Override
	public void stop() {
		// TODO Auto-generated method stub
		
	}

	/**
	 * 返回执行持久化的任务类
	 */
	@Override
	public Class<? extends Task> taskClass() {
		// TODO Auto-generated method stub
		 return FileStreamSinkTask.class;
	}

	/**
	 * 返回配置
	 */
	@Override
	public List<Map<String, String>> taskConfigs(int maxTasks) {
		// TODO Auto-generated method stub
		ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> config = new HashMap<String, String>();
            if (filename != null)
                config.put(FILE_CONFIG, filename);
            configs.add(config);
        }
        return configs;
	}

	@Override
	public String version() {
		// TODO Auto-generated method stub
		return AppInfoParser.getVersion();
	}

}


/**
 * 持久化任务类
 * @author weifu
 *
*/
public class FileStreamSinkTask extends SinkTask {

	private String filename;
    private PrintStream outputStream;
	
	public String version() {
		// TODO Auto-generated method stub
		return new NeteaseFileSinkConnector().version();
	}

	@Override
	public void start(Map<String, String> props) {
		// TODO Auto-generated method stub
		filename = props.get(NeteaseFileSinkConnector.FILE_CONFIG);
        if (filename == null) {
            outputStream = System.out;
        } else {
            try {
                outputStream = new PrintStream(new FileOutputStream(filename, true), false,
                    StandardCharsets.UTF_8.name());
            } catch (FileNotFoundException | UnsupportedEncodingException e) {
                throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
            }
        }
	}

	/**
	 * 持久化具体过程
	 */
	@Override
	public void put(Collection<SinkRecord> records) {
		// TODO Auto-generated method stub
		for(SinkRecord record : records){
			outputStream.println("netease file connect: ");
			outputStream.println(record.value());
		}
	}
	
	
	@Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        outputStream.flush();
    }

	/**
	 * 关闭文件
	 */
	@Override
	public void stop() {
		// TODO Auto-generated method stub
		if(outputStream != null && outputStream != System.out){
			outputStream.close();
		}
	}
}

  • 打包 打包需要注意的是:
    A、 不能把Kafka Connect API打包在内。
    B、 把所依赖的jar包统一放到服务器的某个路径下(需要在配置文件中添加到classpath),推荐在..\confluent-3.3.0\share\java目录下建立一个文件夹(以kafka-connect-….开头命名),这样confluent会自动把该路径加载的classpath中。
  • 在分布式模式下添加该connect
  • 如下图消费两条消息后可以看到文件test.sink.txt已经有内容了:
    输入图片说明
展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部