Kafka

首先介绍一下我了解的kafka的皮毛信息——

  • kafka——一个分布流处理系统:流处理:可以像消息队列一样publish或者subscribe信息;分布式:提供了容错性,并发处理消息的机制
  • 集群——kafka运行在集群上,集群包含一个或多个服务器。所谓服务器集群,就是将很多服务器集中在一起进行同一种服务,在客户端看起来像是只有一个服务器。集群可以利用多个计算机进行并行计算从而有很高的计算速度,也可以使用多个计算机做备份,从而使得一个机器坏了,整个系统还能正常运行
  • Broker——一个集群有多个broker(一台服务器就是一个broker),一个broker可以容纳多个Topic
  • Topic——主题,由用户定义并配置在kafka服务器,用于建立生产者和消费者之间的订阅关系。生产者发送消息到指定的topic下,消费者从这个topic下消费消息。每一条消息包含键值(key),值(value)和时间戳(timestamp)
  • Producer——消息生产者,就是像kafka broker发消息的客户端
  • Consumer——消息消费者,是消息的使用方,负责消费kafka服务器上的信息
  • Partition——消息分区,一个Topic可以分为多个Partition,每个Partition是一个有序的队列,Partition中的每条消息都会被分配一个有序的id(offset)
  • offset——消息在Partition中的偏移量,每一条消息在Partition都有唯一的偏移量。
  • Consumer Group——消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

kafka-python

kafka-python是一个python的Kafka客户端,可以用来向kafka的topic发送消息、消费消息。

接下来介绍如何封装自定义的kafka库,然后再RobotFramework上使用——

  1. 创建文件夹
  • 在D:Python27Libsite-packages的文件夹里面创建你的自定义库文件夹,例如lmkafka

  • 在文件夹里面创建两个文件,分别是__init__.py和producer.py(该文件名自定义),代码如下
#producer.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#命令行输入pip install kafka-python,进行下载

from kafka import KafkaProducer


class Produce(object):
    def produce(self,ip,topic,filepath):
        producer = KafkaProducer(bootstrap_servers=[ip])
        with open(filepath) as f:
            msg = f.read()
            print msg
        # 发送
        producer.send(topic, msg)
        #print filepath
        producer.close()
#__init__.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-

from producer import Produce

class lmkafka(Produce):
    ROBOT_LIBRARY_SCOPE = 'GLOBAL'
  • 创建成功后,尝试重启ride,导入lmkafka库,如果导入不成功(库名为红色),则需要添加路径——在D:Python27Libsite-packages的文件夹里面添加.pth文件,例如lmkafka.pth
  • 文件内容:D:Python27Libsite-packageslmkafka
  • 再次查看,库导入成功

  •  

 库的使用——

可以向kafka里面发送数据啦~

 

内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!