之前是使用NLog直接将日志发送到了ELK,本篇将会使用Docker搭建ELK和kafka,同时替换NLog为Log4net。

一.搭建kafka

1.拉取镜像

//下载zookeeper
docker pull wurstmeister/zookeeper

//下载kafka
docker pull wurstmeister/kafka:2.11-0.11.0.3

2.启动

//启动zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper

//启动kafka
docker run -d --name kafka --publish 9092:9092 
--link zookeeper 
--env KAFKA_ZOOKEEPER_CONNECT=192.168.3.131:2181 
--env KAFKA_ADVERTISED_HOST_NAME=192.168.3.131 
--env KAFKA_ADVERTISED_PORT=9092  
--volume /etc/localtime:/etc/localtime 
wurstmeister/kafka:2.11-0.11.0.3

3.测试Kafka

//查看Kafka容器ID
docker ps

进入容器
docker exec -it [容器ID] bin/bash

//创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.3.131:2181 --replication-factor 1 --partitions 1 --topic mykafka

//查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.3.131:2181

//创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.3.131:9092 --topic mykafka 

//再开一个客户端进入容器
//创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.3.131:2181 --topic mykafka --from-beginning

再生产端发送消息,消费端可以成功接收到就说明没问题。

 

二.Docker安装ELK

1.拉取镜像
docker pull sebp/elk

2.启动ELK
docker run -p 5601:5601 -p 9200:9200 -p 9300:9300 -p 5044:5044 -e ES_MIN_MEM=128m  -e ES_MAX_MEM=2048m -d --name elk sebp/elk 

//若启动过程出错一般是因为elasticsearch用户拥有的内存权限太小,至少需要262144
切换到root用户

执行命令:

sysctl -w vm.max_map_count=262144

查看结果:

sysctl -a|grep vm.max_map_count

显示:

vm.max_map_count = 262144

上述方法修改之后,如果重启虚拟机将失效,所以:

解决办法:

在   /etc/sysctl.conf文件最后添加一行

vm.max_map_count=262144

即可永久修改

等几十秒,然后访问9200和5601端口就可以看到ELK相关的面板。

然后我们还需要配置下logstash:

1.查看elk容器ID
docker ps

2.进入elk容器
docker exec -it 容器ID bin/bash

3.执行命令
/opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }'

当命令成功被执行后,看到:Successfully started Logstash API endpoint {:port=>9600} 信息后,输入:this is a dummy entry 然后回车,模拟一条日志进行测试。

打开浏览器,输入:http://:9200/_search?pretty 如图,就会看到我们刚刚输入的日志内容。

注意:如果看到这样的报错信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 请执行命令:service logstash stop 然后在执行就可以了。

 

OK,测试没问题的话,说明Logstash和ES之间是可以正常联通,然后我们需要配置Logstash从Kafka消费消息:

1.找到config文件
cd /opt/logstash/config

2.编辑配置文件
vi logstash.config
input {
        kafka{
                bootstrap_servers =>["192.168.3.131:9092"]
                client_id => "test" group_id => "test"
                consumer_threads => 5
                decorate_events => true
                topics => "mi"
        }
}
filter{
        json{
                source => "message"
        }
}

output {
        elasticsearch {
                hosts => ["localhost"]
                index => "mi-%{app_id}"
                codec => "json"
        }
}

bootstrap_servers:Kafka地址

这里介绍下这个app_id的作用,生产场景下我们根据不同的项目生成不同的ES索引,比如服务是一个单独的索引,Web是一个,MQ是一个,这些就可以通过传入的app_id来区分创建。

配置完成后加载该配置:

/opt/logstash/bin/logstash -f  /opt/logstash/config/logstash.conf

没问题的话,此时Logstash就会从Kafka消费数据了,然后我们新建一个.net core API项目测试一下:

1.通过NuGet引用 Microsoft.Extensions.Logging.Log4Net.AspNetCore;

2.启动文件中注入 Log4Net:

public static IWebHost BuildWebHost(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
            .ConfigureLogging((logging) =>
            {
                // 过滤掉 System 和 Microsoft 开头的命名空间下的组件产生的警告级别以下的日志
                logging.AddFilter("System", LogLevel.Warning);
                logging.AddFilter("Microsoft", LogLevel.Warning);
                logging.AddLog4Net();
            })
                .UseStartup<Startup>()
                .Build();

3.根目录下添加 log4net.config,设置为 “如果较新则复制”

appender 支持自定义,只要符合 appender 定义规则即可,这里我使用了公司同事大佬写的一个将日志写入 Kafka 的 Nuget 包, log4net.Kafka.Core,安装后在 log4net.config 添加 KafkaAppender 配置即可。log4net.Kafka.Core 源码 在 github 上 ,可以 Fork 自行修改。
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
    <appender name="KafkaAppender" type="log4net.Kafka.Core.KafkaAppender, log4net.Kafka.Core">
        <KafkaSettings>
            <broker value="192.168.3.131:9092" />
            <topic value="mi" />
        </KafkaSettings>
        <layout type="log4net.Kafka.Core.KafkaLogLayout,log4net.Kafka.Core" >
            <appid value="api-test" />
        </layout>
    </appender>
    <root>
        <level value="ALL"/>
        <appender-ref ref="KafkaAppender" />
    </root>
</log4net>
broker: Kafka 服务地址,集群可使用,分割;
topic:日志对应的 Topic 名称;
appid:服务唯一标识,辅助识别日志来源;
 
在 ValuesController 修改代码进行测试:
    [Route("api/[controller]")]
    public class ValuesController : Controller
    {
        private readonly ILogger _logger;

        public ValuesController(ILogger<ValuesController> logger)
        {
            _logger = logger;
        }

        // GET api/values
        [HttpGet]
        public IEnumerable<string> Get()
        {
            _logger.LogInformation("根据appId最后一次测试Kafka!");
            return new string[] { "value1", "value2" };
        }
    }

OK,运行然后访问5601端口查看:

 

 OK,搭建成功!
 
 
参考文章:
内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!