import re
import requests
import json
from threading import Thread,Lock
from concurrent.futures import ThreadPoolExecutor

list1 = []
list2 = []
code_list = [200,301,302,401]  # 定义正确的状态码

class MyThread(Thread):
    '''
    用来获取线程的值
    '''
    def __init__(self,func,args=()):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args
    def run(self):
        self.result = self.func(*self.args)
    def get_result(self):
        try:
            return self.result
        except Exception:
            return None

def get_url():
    '''
    打开存放url的文件,并将结果返回出去
    :return:
    '''
    try:
        with open('hosts.txt','r',encoding='utf-8') as f:
            data = f.readlines()
            return data
    except Exception:  # 文件不存在则返回False
        return False

def verdictUrl():
    '''
    从hosts.txt文件中取出url,然后进行合法性检测
    :return:
    '''
    url_list = []
    comment_list = []
    get_url_res = get_url()
    if get_url_res:
        for data in get_url_res:
            url = data.split(',')[0]
            comment = data.split(',')[-1]
            try:
                res = re.search(r'httpw{0,1}://(w+.){2}w+.*', url).group()
                url_list.append(res)
                comment_list.append(comment)
            except Exception:
                print('url:%s 有误'%url)
        return (url_list,comment_list)
    else:
        print('文件不存在......')

def getStatusCode(url,comment):
    '''
    获取网站的状态码,并将它返回出去
    :param url:
    :param comment:
    :return:
    '''
    global list1,list2
    try:
        res = requests.head(url)
        if res.status_code in code_list:
            lock.acquire()  # 开始添加互斥锁
            list1.append(res.status_code)
            lock.release()
    except requests.exceptions.ConnectionError:
        status = 0  # 自定义状态码
        lock.acquire()
        list2.append(status)
        lock.release()
    else:
        status = res.status_code  # 将状态码赋值给status
    finally:
        return {'url':url,'StatusCode':status,'comment':comment}

def sendDingDing(bc):
    '''
    用来接收getStatusCode的返回值以及钉钉发送消息
    :param bc:
    :return:
    '''
    ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=e0bef403aded94c230953384353bc411a7fba57389ebd59bc0e63cc602ec175f'
    HEADERS = {
        "Content-Type": "application/json ;charset=utf-8"
    }
    bc = bc.result()
    url = bc['url']
    status = bc['StatusCode']
    comment = bc['comment']
    string_textMsg = {
        'msgtype': 'text',
        'text': {  # 自行添加需要的内容
            'content': 'url地址:%sn'
                       'url名称:%sn'
                       '状态码:%sn'% (url, comment,status)
        }
    }
    string_textMsg = json.dumps(string_textMsg)  # 序列化到内存中
    res = requests.post(ding_url, data=string_textMsg, headers=HEADERS)
if __name__ == '__main__':
    lock = Lock()  # 创建锁对象

    pool = ThreadPoolExecutor(4)  # 线程池
    url,comment = verdictUrl()
    res = zip(url,comment)
    li = []
    for i in res:
        for j in range(4):  # 开启多线程
            t = MyThread(getStatusCode,args = (i[0],i[1]))
            li.append(t)
            t.start()
        for t in li:
            t.join()

        if len(list1)>3 or len(list2)>3:  # 如果xxxxx,则交给sendDingDing处理
            pool.submit(getStatusCode,i[0],i[1]).add_done_callback(sendDingDing)

 

需要注意的是:

1.需要在当前目录下创建hosts.txt文件,文件内容格式为:

  https://www.baidu.com,百度首页

  https://www.trc.com,泰然城首页

  https://www.jd.com,京东商城

2.ding_url换成自己的钉钉机器人webhook链接,也可以换成微信报警

内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!