文档章节

Nginx Kafka数据生产接口

China_OS
 China_OS
发布于 2017/03/09 19:09
字数 2121
阅读 20
收藏 0

介绍

        在向kafka产生数据的过程中,在随机的一台服务器上想给kafka集群发送数据,还要先安装相关语言的lib库,实在是繁琐,我的需求是向kafka集群发送大量的数据,而读取kafka中的数据则是通过另一套系统去读取,有没有开箱即用的插件呢,比如http接口,github上真有几个,包括前面博文中介绍的(前面介绍的是作为kafka rest接口来实现的,功能齐全,但复杂),出现较早的几个模块很早已经测试过了,因为项目的种种原因并不能完全符合需求,后面出现国人的一个项目,拿来改改凑活用。

开源项目

        1    ngx_kafka_module  3Q brg_liuwei

        2    上面的变种

缺陷

        ngx_kafka_module只能向kafka集群发送POST请求的数据,比如想象nginx一样获取变量值记录在日志中,暂时还没有实现,作者还在改进中....

改进项

        根据项目需要先获取以下变量的值(由于kafka模块在nginx中所处的处理阶段,并不是所有的nginx内置变量都可以获取到)。

$remote_addr|$time_local|$request|$http_user_agent|$request_body

变种代码

        ngx_http_kafka_module.c:

/*
 * nginx kafka module
 *
 * using librdkafka: https://github.com/edenhill/librdkafka
 */

#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_string.h>
#include <stdint.h>

#include <alloca.h>
#include <librdkafka/rdkafka.h>
#include <errno.h>

#define KAFKA_TOPIC_MAXLEN 256
#define KAFKA_BROKER_MAXLEN 512

#define KAFKA_ERR_NO_DATA "no_message\n"
#define KAFKA_ERR_BODY_TO_LARGE "body_too_large\n"
#define KAFKA_ERR_PRODUCER "kafka_producer_error\n"

#define KAFKA_PARTITION_UNSET 0xFFFFFFFF

static ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle);
static void ngx_http_kafka_exit_worker(ngx_cycle_t *cycle);

static void *ngx_http_kafka_create_main_conf(ngx_conf_t *cf);
static void *ngx_http_kafka_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_kafka_merge_loc_conf(ngx_conf_t *cf,
        void *parent, void *child);
static char *ngx_http_set_kafka(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_set_kafka_broker_list(ngx_conf_t *cf,
        ngx_command_t *cmd, void *conf);
static char *ngx_http_set_kafka_topic(ngx_conf_t *cf,
        ngx_command_t *cmd, void *conf);
static char *ngx_http_set_kafka_partition(ngx_conf_t *cf,
        ngx_command_t *cmd, void *conf);
static char *ngx_http_set_kafka_broker(ngx_conf_t *cf,
        ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_http_kafka_handler(ngx_http_request_t *r);
static void ngx_http_kafka_post_callback_handler(ngx_http_request_t *r);

typedef enum {
    ngx_str_push = 0,
    ngx_str_pop = 1
} ngx_str_op;

static void ngx_str_helper(ngx_str_t *str, ngx_str_op op);

typedef struct {
    rd_kafka_t       *rk;
    rd_kafka_conf_t  *rkc;
    ngx_array_t      *broker_list;
} ngx_http_kafka_main_conf_t;

static char *ngx_http_kafka_main_conf_broker_add(ngx_http_kafka_main_conf_t *cf,
        ngx_str_t *broker);

typedef struct {
    ngx_str_t   topic;     /* kafka topic */
    ngx_str_t   broker;    /* broker addr (eg: localhost:9092) */

    /* kafka partition(0...N), default value: RD_KAFKA_PARTITION_UA */
    ngx_int_t   partition;

    rd_kafka_topic_t       *rkt;
    rd_kafka_topic_conf_t  *rktc;

} ngx_http_kafka_loc_conf_t;

static ngx_command_t ngx_http_kafka_commands[] = {
    {
        ngx_string("kafka"),
        NGX_HTTP_MAIN_CONF|NGX_CONF_NOARGS,
        ngx_http_set_kafka,
        NGX_HTTP_MAIN_CONF_OFFSET,
        0,
        NULL },
    {
        ngx_string("kafka_broker_list"),
        NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,
        ngx_http_set_kafka_broker_list,
        NGX_HTTP_MAIN_CONF_OFFSET,
        0,
        NULL },
    {
        ngx_string("kafka_topic"),
        NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
        ngx_http_set_kafka_topic,
        NGX_HTTP_LOC_CONF_OFFSET,
        offsetof(ngx_http_kafka_loc_conf_t, topic),
        NULL },
    {
        ngx_string("kafka_partition"),
        NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
        ngx_http_set_kafka_partition,
        NGX_HTTP_LOC_CONF_OFFSET,
        offsetof(ngx_http_kafka_loc_conf_t, partition),
        NULL },
    {
        ngx_string("kafka_broker"),
        NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
        ngx_http_set_kafka_broker,
        NGX_HTTP_LOC_CONF_OFFSET,
        offsetof(ngx_http_kafka_loc_conf_t, broker),
        NULL },
    ngx_null_command
};


static ngx_http_module_t ngx_http_kafka_module_ctx = {
    NULL,                             /* pre conf */
    NULL,                             /* post conf */

    ngx_http_kafka_create_main_conf,  /* create main conf */
    NULL,                             /* init main conf */

    NULL,                             /* create server conf */
    NULL,                             /* merge server conf */

    ngx_http_kafka_create_loc_conf,   /* create local conf */
    ngx_http_kafka_merge_loc_conf,    /* merge location conf */
};


ngx_module_t ngx_http_kafka_module = {
    NGX_MODULE_V1,
    &ngx_http_kafka_module_ctx,   /* module context */
    ngx_http_kafka_commands,      /* module directives */
    NGX_HTTP_MODULE,              /* module type */

    NULL,                         /* init master */
    NULL,                         /* init module */

    ngx_http_kafka_init_worker,   /* init process */
    NULL,                         /* init thread */

    NULL,                         /* exit thread */
    ngx_http_kafka_exit_worker,   /* exit process */
    NULL,                         /* exit master */

    NGX_MODULE_V1_PADDING
};


char *ngx_http_kafka_main_conf_broker_add(ngx_http_kafka_main_conf_t *cf,
        ngx_str_t *broker)
{
    ngx_str_t   *new_broker;

    new_broker = ngx_array_push(cf->broker_list);
    if (new_broker == NULL) {
        return NGX_CONF_ERROR;
    }

    *new_broker = *broker;
    return NGX_OK;
}


void *ngx_http_kafka_create_main_conf(ngx_conf_t *cf)
{
    ngx_http_kafka_main_conf_t  *conf;

    conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_kafka_main_conf_t));
    if (conf == NULL) {
        return NGX_CONF_ERROR;
    }

    conf->rk = NULL;
    conf->rkc = NULL;
    conf->broker_list = ngx_array_create(cf->pool, 4, sizeof(ngx_str_t));
    if (conf->broker_list == NULL) {
        return NULL;
    }

    return conf;
}


void *ngx_http_kafka_create_loc_conf(ngx_conf_t *cf)
{
    ngx_http_kafka_loc_conf_t  *conf;

    conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_kafka_loc_conf_t));
    if (conf == NULL) {
        return NGX_CONF_ERROR;
    }

    ngx_str_null(&conf->topic);
    ngx_str_null(&conf->broker);

    /*
     * Could not set conf->partition RD_KAFKA_PARTITION_UA, 
     * because both values of RD_KAFKA_PARTITION_UA and NGX_CONF_UNSET is -1
     */
    conf->partition = KAFKA_PARTITION_UNSET;

    return conf;
}


char *ngx_http_kafka_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
    ngx_http_kafka_loc_conf_t *prev = parent;
    ngx_http_kafka_loc_conf_t *conf = child;

#define ngx_conf_merge_kafka_partition_conf(conf, prev, def) \
    if (conf == KAFKA_PARTITION_UNSET) { \
        conf = (prev == KAFKA_PARTITION_UNSET) ? def : prev; \
    }

    ngx_conf_merge_kafka_partition_conf(conf->partition, prev->partition,
            RD_KAFKA_PARTITION_UA);

#undef ngx_conf_merge_kafka_partition_conf

    return NGX_CONF_OK;
}

void kafka_callback_handler(rd_kafka_t *rk,
        void *msg, size_t len, int err, void *opaque, void *msg_opaque)
{
    if (err != 0) {
        ngx_log_error(NGX_LOG_ERR,
                (ngx_log_t *)msg_opaque, 0, rd_kafka_err2str(err));
    }
}


char *ngx_http_set_kafka(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    /* we can add more code here to config ngx_http_kafka_main_conf_t */
    return NGX_CONF_OK;
}


char *ngx_http_set_kafka_broker_list(ngx_conf_t *cf,
        ngx_command_t *cmd, void *conf)
{
    char       *cf_result;
    ngx_uint_t  i;
    ngx_str_t  *value;

    ngx_http_kafka_main_conf_t *main_conf;

    main_conf = conf;
    value = cf->args->elts;

    for (i = 1; i < cf->args->nelts; ++i) {
        cf_result = ngx_http_kafka_main_conf_broker_add(main_conf, &value[i]);
        if (cf_result != NGX_OK) {
            return cf_result;
        }
    }

    return NGX_OK;
}


char *ngx_http_set_kafka_topic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    char                       *cf_result;
    ngx_http_core_loc_conf_t   *clcf;
    ngx_http_kafka_loc_conf_t  *local_conf;

    /* install ngx_http_kafka_handler */
    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
    if (clcf == NULL) {
        return NGX_CONF_ERROR;
    }
    clcf->handler = ngx_http_kafka_handler;

    /* ngx_http_kafka_loc_conf_t::topic assignment */
    cf_result = ngx_conf_set_str_slot(cf, cmd, conf);
    if (cf_result != NGX_CONF_OK) {
        return cf_result;
    }

    local_conf = conf;

    local_conf->rktc = rd_kafka_topic_conf_new();

    return NGX_CONF_OK;
}


char *ngx_http_set_kafka_partition(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    char  *p = conf;

    ngx_int_t        *np; 
    ngx_str_t        *value;


    np = (ngx_int_t *)(p + cmd->offset);

    if (*np != KAFKA_PARTITION_UNSET) {
        return "is duplicate";
    }    

    value = cf->args->elts;

    if (ngx_strncmp("auto", (const char *)value[1].data, value[1].len) == 0) {
        *np = RD_KAFKA_PARTITION_UA;
    } else {
        *np = ngx_atoi(value[1].data, value[1].len);
        if (*np == NGX_ERROR) {
            return "invalid number";
        }
    }

    return NGX_CONF_OK;
}


char *ngx_http_set_kafka_broker(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    char                       *cf_result;
    ngx_http_kafka_loc_conf_t  *local_conf;
    ngx_http_kafka_main_conf_t *main_conf;

    /* ngx_http_kafka_loc_conf_t::broker assignment */
    cf_result = ngx_conf_set_str_slot(cf, cmd, conf);
    if (cf_result != NGX_CONF_OK) {
        return cf_result;
    }

    local_conf = conf;

    main_conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_kafka_module);
    if (main_conf == NULL) {
        return NGX_CONF_ERROR;
    }
    return ngx_http_kafka_main_conf_broker_add(main_conf, &local_conf->broker);
}


static ngx_int_t ngx_http_kafka_handler(ngx_http_request_t *r)
{
    ngx_int_t  rv;

    if (!(r->method & NGX_HTTP_POST)) {
        return NGX_HTTP_NOT_ALLOWED;
    }

    rv = ngx_http_read_client_request_body(r, ngx_http_kafka_post_callback_handler);
    if (rv >= NGX_HTTP_SPECIAL_RESPONSE) {
        return rv;
    }

    return NGX_DONE;
}

// get remote_addr_ip
static int ngx_http_variable_remote_addr(ngx_http_request_t *r, char *ip_addr, uint32_t *ip_len)
{
	if (!r || !ip_addr || !ip_len)
		return -1;

	*ip_len = r->connection->addr_text.len;
	strncpy(ip_addr, (const char *)r->connection->addr_text.data, *ip_len);
	return 0;
}


static int32_t ngx_construct_log_prefix(ngx_http_request_t *r, u_char *in_buf, size_t in_buf_len, u_char **out_buf, size_t *p_out_buf_len)
{
	//u_char str_buf[1000];

	// remote_addr
	if (r == NULL || in_buf == NULL || in_buf_len == 0)
		return -1;
	uint32_t ip_len = 0;
	if (ngx_http_variable_remote_addr(r, (char*)in_buf, &ip_len) == 0) {
		in_buf[ip_len] = '|';
	} else
		return -2;

	if ((ip_len + 1) > in_buf_len)
		return -999;
	*p_out_buf_len += (ip_len + 1);


	// time_local
	u_char buf[1000];  // for hash_key buffer
	int keylen = snprintf((char*)buf, sizeof(buf), "%s", "time_local");
	ngx_uint_t ikey = ngx_hash_strlow(buf, buf, keylen);
	ngx_str_t name = ngx_string("time_local");
	ngx_variable_value_t *pvar = ngx_http_get_variable(r, &name, ikey);

	if (pvar == NULL || pvar->not_found)
		return -3;

	int var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 2, "%s|", (char*)(pvar->data));
	*p_out_buf_len += var_len;
	if (*p_out_buf_len > in_buf_len)
		return -999;

	// request
	keylen = snprintf((char*)buf, sizeof(buf), "%s", "request");
	//keylen = (int)(tmp - buf);
	ikey = ngx_hash_strlow(buf, buf, keylen);
	ngx_str_t req_name = ngx_string("request");
	pvar = ngx_http_get_variable(r, &req_name, ikey);

	if (pvar == NULL || pvar->not_found)
		return -4;

	var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 1, "%s|", (char*)(pvar->data));
	*p_out_buf_len += var_len;
	if (*p_out_buf_len > in_buf_len)
		return -999;


	// http_user_agent
	keylen = snprintf((char*)buf, sizeof(buf), "%s", "http_user_agent");
	//keylen = (int)(tmp - buf);
	ikey = ngx_hash_strlow(buf, buf, keylen);
	ngx_str_t agent_name = ngx_string("http_user_agent");
	pvar = ngx_http_get_variable(r, &agent_name, ikey);

	if (pvar == NULL || pvar->not_found)
		return -7;

	var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 3, "|%s|", (char*)(pvar->data));
	*p_out_buf_len += var_len;
	if (*p_out_buf_len > in_buf_len)
		return -999;

	*out_buf = in_buf;
	return 0;
}

static void ngx_http_kafka_post_callback_handler(ngx_http_request_t *r)
{
    int                          rc, nbufs;
    u_char                      *msg, *err_msg;
    size_t                       len, err_msg_size;
    ngx_log_t                   *conn_log;
    ngx_buf_t                   *buf;
    ngx_chain_t                  out;
    ngx_chain_t                 *cl, *in;
    ngx_http_request_body_t     *body;
    ngx_http_kafka_main_conf_t  *main_conf;
    ngx_http_kafka_loc_conf_t   *local_conf;

    err_msg = NULL;
    err_msg_size = 0;

    main_conf = NULL;

    /* get body */
    body = r->request_body;
    if (body == NULL || body->bufs == NULL) {
        err_msg = (u_char *)KAFKA_ERR_NO_DATA;
        err_msg_size = sizeof(KAFKA_ERR_NO_DATA);
        r->headers_out.status = NGX_HTTP_OK;
        goto end;
    }

    /* calc len and bufs */
    len = 0;
    nbufs = 0;
    in = body->bufs;
    for (cl = in; cl != NULL; cl = cl->next) {
        nbufs++;
        len += (size_t)(cl->buf->last - cl->buf->pos);
    }

    /* get msg */
    if (nbufs == 0) {
        err_msg = (u_char *)KAFKA_ERR_NO_DATA;
        err_msg_size = sizeof(KAFKA_ERR_NO_DATA);
        r->headers_out.status = NGX_HTTP_OK;
        goto end;
    }


    ////////////////////////////////////////////////////////////////////////////
    int in_memory = 0;   // 0: not in memory, 1: in memory

    if (nbufs == 1 && ngx_buf_in_memory(in->buf)) {

        msg = in->buf->pos;
        in_memory = 1;

    } else {

        if ((msg = ngx_pnalloc(r->pool, len )) == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        //in_memory = 0;   // not in memory

        for (cl = in; cl != NULL; cl = cl->next) {
            if (ngx_buf_in_memory(cl->buf)) {
                msg = ngx_copy(msg, cl->buf->pos, cl->buf->last - cl->buf->pos);
            } else {
                /* TODO: handle buf in file */
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                        "ngx_http_kafka_handler cannot handle in-file-post-buf");

                err_msg = (u_char *)KAFKA_ERR_BODY_TO_LARGE;
                err_msg_size = sizeof(KAFKA_ERR_BODY_TO_LARGE);
                r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;

                goto end;
            }
        }
        msg -= len;

    }

    /* send to kafka */
    main_conf = ngx_http_get_module_main_conf(r, ngx_http_kafka_module);
    local_conf = ngx_http_get_module_loc_conf(r, ngx_http_kafka_module);
    if (local_conf->rkt == NULL) {
        ngx_str_helper(&local_conf->topic, ngx_str_push);
        local_conf->rkt = rd_kafka_topic_new(main_conf->rk,
                (const char *)local_conf->topic.data, local_conf->rktc);
        ngx_str_helper(&local_conf->topic, ngx_str_pop);
    }

    /*
     * the last param should NOT be r->connection->log, for reason that
     * the callback handler (func: kafka_callback_handler) would be called 
     * asynchronously when some errors being happened.
     *
     * At this time, ngx_http_finalize_request may have been invoked.
     * In this case, the object r had been destroyed
     * but kafka_callback_handler use the pointer
     * r->connection->log! Worker processes CRASH!
     *
     * Thanks for engineers of www.360buy.com report me this bug.
     *
     * */
    u_char buffer[1024 * 1024];
    size_t out_buf_len = 0;
    u_char *p_out_buf = NULL;
    if (ngx_construct_log_prefix(r, buffer, sizeof(buffer), &p_out_buf, &out_buf_len) < 0)
    	goto end;

    void *kafka_msg = NULL;
    size_t all_len = out_buf_len + len;
    int new_memory = 0;

    if (all_len <= sizeof(buffer))
    	memcpy(buffer + out_buf_len, msg, len);
    else
    {
    	new_memory = 1;
    	if (all_len < sizeof(buffer))
    	{
    		kafka_msg = (void*)malloc(all_len);
    		if (kafka_msg == NULL)
    			goto end;

    		memcpy(kafka_msg, buffer, out_buf_len);
    		memcpy(((char*)kafka_msg + out_buf_len), msg, len);
    	}
    }



    conn_log = r->connection->log;
    //rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,
    //        RD_KAFKA_MSG_F_COPY, (void *)msg, len, NULL, 0, conn_log);
    if (new_memory == 1)
    	rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,
            RD_KAFKA_MSG_F_COPY, (void *)kafka_msg, all_len, NULL, 0, conn_log);
    else if (new_memory == 0)
    	rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,
    	    RD_KAFKA_MSG_F_COPY, (void *)buffer, all_len, NULL, 0, conn_log);

    if (rc != 0) {
        ngx_log_error(NGX_LOG_ERR, conn_log, 0,
                rd_kafka_err2str(rd_kafka_errno2err(errno)));

        err_msg = (u_char *)KAFKA_ERR_PRODUCER;
        err_msg_size = sizeof(KAFKA_ERR_PRODUCER);
        r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    if (new_memory && kafka_msg != NULL) {
    	free(kafka_msg);
    	kafka_msg = NULL;
    }

end:

    if (err_msg != NULL) {
        buf = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));
        out.buf = buf;
        out.next = NULL;
        buf->pos = err_msg;
        buf->last = err_msg + err_msg_size - 1;
        buf->memory = 1;
        buf->last_buf = 1;

        ngx_str_set(&(r->headers_out.content_type), "text/html");
        ngx_http_send_header(r);
        ngx_http_output_filter(r, &out);
    } else {
        r->headers_out.status = NGX_HTTP_NO_CONTENT;
        ngx_http_send_header(r);
    }

    ngx_http_finalize_request(r, NGX_OK);

    if (main_conf != NULL) {
        rd_kafka_poll(main_conf->rk, 0);
    }
}


ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle)
{
    ngx_uint_t                   n;
    ngx_str_t                   *broker_list;
    ngx_http_kafka_main_conf_t  *main_conf;

    main_conf = ngx_http_cycle_get_module_main_conf(cycle,
            ngx_http_kafka_module);
    main_conf->rkc = rd_kafka_conf_new();
    rd_kafka_conf_set_dr_cb(main_conf->rkc, kafka_callback_handler);
    main_conf->rk = rd_kafka_new(RD_KAFKA_PRODUCER, main_conf->rkc, NULL, 0);

    broker_list = main_conf->broker_list->elts;

    for (n = 0; n < main_conf->broker_list->nelts; ++n) {
        ngx_str_helper(&broker_list[n], ngx_str_push);
        rd_kafka_brokers_add(main_conf->rk, (const char *)broker_list[n].data);
        ngx_str_helper(&broker_list[n], ngx_str_pop);
    }

    return 0;
}


void ngx_http_kafka_exit_worker(ngx_cycle_t *cycle)
{
    ngx_http_kafka_main_conf_t  *main_conf;

    main_conf = ngx_http_cycle_get_module_main_conf(cycle,
            ngx_http_kafka_module);

    rd_kafka_poll(main_conf->rk, 0);

    while (rd_kafka_outq_len(main_conf->rk) > 0) {
        rd_kafka_poll(main_conf->rk, 100);
    }

    // TODO: rd_kafka_topic_destroy(each loc conf rkt);
    rd_kafka_destroy(main_conf->rk);
}


void ngx_str_helper(ngx_str_t *str, ngx_str_op op)
{
    static char backup;

    switch (op) {
        case ngx_str_push:
            backup = str->data[str->len];
            str->data[str->len] = 0;
            break;
        case ngx_str_pop:
            str->data[str->len] = backup;
            break;
        default:
            ngx_abort();
    }
}

编译安装

        下载nginx-1.10.3,采用动态模块安装,根据这里安装即可

        编译

./configure --prefix=/opt/programs/nginx_1.10.3 --add-dynamic-module=/opt/programs/ngx_kafka_module

        安装

make
make install

        nginx配置

            注意so文件的加载位置

#user  nobody;
worker_processes  1;

error_log  logs/error.log;
pid        logs/nginx.pid;

load_module modules/ngx_http_kafka_module.so;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main   $remote_addr|$time_local|$request|$status|$body_bytes_sent|$http_user_agent|$http_x_forwarded_for|$request_body;

    access_log  logs/access.log  main;

    sendfile        on;
    keepalive_timeout  65;

 
    kafka_broker_list 172.31.68.243:9092;

    server {
        listen       80;
        server_name  localhost;

        access_log  logs/host.access.log  main;

        location / {

            kafka_topic test2;
        }
    }
}

测试

    通过http接口发送消息

curl -X POST -d 'guol123456' 'http://172.31.68.243/kafka_topic'

    查看kafka

 

 

© 著作权归作者所有

上一篇: Impala Shell
China_OS
粉丝 427
博文 463
码字总数 519985
作品 0
静安
技术主管
私信 提问
Nginx集成Kafka

xupan002 #librdkafka 是一个C实现的高性能 Apache Kafka 客户端,为生产环境提供了一个可靠和高性能的客户端。 librdkafka 同样也提供了传统的 C++ 接口。 性能 librdkafka 是一个基于现代硬...

xpttxsok
2017/12/25
791
0
logstash通过kafka传输nginx日志

logstash通过kafka传输nginx日志(三)   单个进程 logstash 可以实现对数据的读取、解析和输出处理。但是在生产环境中,从每台应用服务器运行 logstash 进程并将数据直接发送到 Elastics...

yixinsiyu
2018/08/27
127
0
Kafka C++客户端库librdkafka笔记

目录 目录 1 1. 前言 2 2. 缩略语 2 3. 配置和主题 3 3.1. 配置和主题结构 3 3.1.1. Conf 3 3.1.2. ConfImpl 3 3.1.3. Topic 3 3.1.4. TopicImpl 3 4. 线程 4 5. 消费者 5 5.1. 消费者结构 ...

一见蓝天
2018/05/03
0
0
亿级流量电商详情页系统的大型高并发与高可用缓存架构实战

对于高并发的场景来说,比如电商类,o2o,门户,等等互联网类的项目,缓存技术是Java项目中最常见的一种应用技术。然而,行业里很多朋友对缓存技术的了解与掌握,仅仅停留在掌握redis/memca...

登录404
2017/06/05
1K
0
基于kafka rest实现资源访问服务化(实战)

问题引出 新产品的体系架构包含多个模块,模块集特点是数量多、模块间交互复杂。那么统一接口是一个很好的解决方案,为了实现统一接口打算采用微服务的核心思想,设计了采用restful service...

xnchall
2018/08/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

排序––快速排序(二)

根据排序––快速排序(一)的描述,现准备写一个快速排序的主体框架: 1、首先需要设置一个枢轴元素即setPivot(int i); 2、然后需要与枢轴元素进行比较即int comparePivot(int j); 3、最后...

FAT_mt
39分钟前
3
0
mysql概览

学习知识,首先要有一个总体的认识。以下为mysql概览 1-架构图 2-Detail csdn |简书 | 头条 | SegmentFault 思否 | 掘金 | 开源中国 |

程序员深夜写bug
今天
9
0
golang微服务框架go-micro 入门笔记2.2 micro工具之微应用利器micro web

micro web micro 功能非常强大,本文将详细阐述micro web 命令行的功能 阅读本文前你可能需要进行如下知识储备 golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境, golang微服务框架...

非正式解决方案
今天
6
0
前端——使用base64编码在页面嵌入图片

因为页面中插入一个图片都要写明图片的路径——相对路径或者绝对路径。而除了具体的网站图片的图片地址,如果是在自己电脑文件夹里的图片,当我们的HTML文件在别人电脑上打开的时候图片则由于...

被毒打的程序猿
今天
8
0
Flutter 系列之Dart语言概述

Dart语言与其他语言究竟有什么不同呢?在已有的编程语言经验的基础上,我们该如何快速上手呢?本篇文章从编程语言中最重要的组成部分,也就是基础语法与类型变量出发,一起来学习Dart吧 一、...

過愙
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部