前言

本文将介绍如何使用Apisix插件将HTTP请求日志发送到Kafka消息队列中,并使用ELK(Elasticsearch、Logstash和Kibana)堆栈进行检索和分析。我们将讨论如何配置Apisix插件以捕获和格式化日志数据,并如何设置Kafka的生产者和消费者,以及如何使用Logstash将日志数据从Kafka传输到Elasticsearch。最后,我们将演示如何使用Kibana可视化和搜索日志数据,以便更好地理解和优化API的性能。通过阅读本文,您将学习如何使用Apisix插件和ELK堆栈来实现高效的日志收集和分析。

操作流程

开启Apisix日志收集插件

  1. 在Apisix Dashboard首页,进入插件列表页面,点击启用

    image-20220812174841984

  2. 可观测性中,找到Kafka-logger插件,点击启用

    image-20220812174956450

  3. 勾选启用,填写json配置

    image-20220812175032860

    配置:

    {
      "batch_max_size": 10,
      "broker_list": {
        "10.246.0.37": 9092,
        "10.246.0.38": 9092,
        "10.246.0.39": 9092
      },
      "include_req_body": true,
      "include_resp_body": true,
      "kafka_topic": "apisix-logs"
    }
    
    • 关键参数说明:

      • batch_max_size:Apisix为日志收集插件默认启用Batch-Processor,该参数设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到后端服务;
      • broker_list:要推送的 kafka 的 broker 列表
      • include_req_body:是否包括请求 body
      • include_resp_body:是否包括响应体
      • kafka_topic:要推送的 topic

      其他参数详见官方文档

    1. 提交即时生效。

使用LogStash消费Kafka数据至ES

  1. 下载LogStash,官方地址

    image-20220812175948868

  2. 配置logstash.conf

    input {
            kafka {
                    bootstrap_servers => "10.246.0.37:9092,10.246.0.38:9092,10.246.0.39:9092"
                    client_id => "c_logstash_1"
                    group_id => "g_logstash_1"
                    auto_offset_reset => "latest" 
                    consumer_threads => 1
                    decorate_events => true
                    topics => "apisix-logs"
                    codec => "json"
            }
    }
    filter {
          date { 
               match => ["start_time", "UNIX_MS"] 
               target => "@timestamp" 
          } 
          # 定义uuid插件
          uuid {
            # 生成的唯一id,保存到target指定的字段
            target    => "uuid"
            # 如果target指定的字段已经存在,是否覆盖
            overwrite => true
          }
    }
    output {
            stdout {
                codec => rubydebug { }
            }
            elasticsearch {
                    hosts => ["10.246.0.37:9201","10.246.0.38:9201","10.246.0.39:9201"]
                    index => "apisix-logstash-%{[server][hostname]}-%{+YYYY.MM.dd}"
                    document_type => "doc"
                    document_id => "%{uuid}"
                    manage_template => false
            }
    }
    

    说明:

    • input中,kafka.topics需要按实际情况调整,这里可以配置为数组,如topics => [“apisix-logs”,“apisix-logs2”]
    • filter中,将json中的start_time格式化后,当成es中的时间戳
    • output中,stdout是打印debug日志,生产环境可以省略;elasticsearch.document_id中,是利用logstash插件logstash-filter-uuid生成的
  3. 配置后,启动logstash,如果一切正常,可以在es中看到生成了索引

    image-20220812180444037

在Kibana中新增索引

  1. 在Kibana的Management页面中,点击Index Patterns

    image-20220812180616989

  2. Create Index Pattern新增索引

    image-20220812180700238

  3. 输入apisix-logstash*

    image-20220812180810765

  4. 选择时间筛选为@timestamp,点击Create index pattern

    image-20220812180902540

  5. Discover就可以看到ES中的数据了

    image-20220812181055566