简单分布式爬虫

news/2024/7/4 13:08:33
# url管理器

# url管理器
import pickle
import hashlib


class UrlManager():
    def __init__(self):
        self.new_urls = self.load_progress('new_urls.txt')  # 未爬取url集合
        self.old_urls = self.load_progress('old_urls.txt')  # 已爬取集合

    def has_new_url(self):
        '''
        判断是否有未爬取的URL
        :return:
        '''
        return self.new_url_size() != 0

    def get_new_url(self):
        '''
        获取一个未爬取的URL
        :return:
        '''
        new_url = self.new_urls.pop()
        m = hashlib.md5()
        m.update(new_url)
        self.old_urls.add(m.hexdigest()[8:-8])
        return new_url

    def add_new_url(self, url):
        '''
        将新的URL添加到未爬取的集合中
        :param url: 单个URL
        :return:
        '''
        if url is None:
            return
        m = hashlib.md5()
        m.update(url)
        url_md5 = m.hexdigest()[8:-8]
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)

    def add_new_urls(self, urls):
        '''
        将新的URL添加到未爬取的URL集合中
        :param urls: URL集合
        :return:
        '''
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)

    def new_url_size(self):
        '''
        获取未爬取URL集合的大小
        :return:
        '''
        return len(self.new_urls)

    def old_url_size(self):
        '''
        获取已经爬取的URL集合的大小
        :return:
        '''
        return len(self.old_urls)

    def save_progress(self, path, data):
        '''
        保存进度
        :param path: 文件路径
        :param data: 数据
        :return:
        '''
        with open(path, 'wb') as f:
            pickle.dump(data, f)

    def load_progress(self, path):
        '''
        从本地文件加载进度
        :param path: 文件路径
        :return: 返回set集合
        '''
        print('[+] 从文件加载进度:%s' % path)
        try:
            with open(path, 'rb') as f:
                tmp = pickle.load(f)
                return tmp
        except:
            print('[!] 文件无效,创建:%s' % path)
        return set()
#爬虫管理器
from multiprocessing.managers import BaseManager
from .HTML_downloader import HtmlDownloader
from .HTML_parser import HtmlParser
class Spiderwork():
    def __init__(self):
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')
        server_addr = '127.0.0.1'
        print('connect to %s ....'% server_addr)
        self.m = BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
        self.m.connect()
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print('init finshed..')

    def crawl(self):
        while True:
            try:
                if not self.task.empty():
                    url = self.task.get()
                    if url =='end':
                        print('控制节点通知爬虫节点停止工作。')
                        self.result.put({'new_urls':'end','data':'end'})
                        return
                    print('爬虫节点正在解析:%s' % url.encode('utf-8'))
                    content=self.downloader.download(url)
                    new_urls,data=self.parser.parser(url,content)
                    self.result.put({'new_urls':url,'data':data})
            except EOFError as e :
                print('链接工作节点失败')
                return
            except Exception as e :
                print(e)
                print('crawl fial')

if __name__ =='__main__':
    spider = Spiderwork()
    spider.crawl()
# HTML解析器
import re
from urllib import parse
from bs4 import BeautifulSoup

class HtmlParser():

    def parser(self,page_url,html_cont):
        '''
        用于解析网页内容,抽取URL和数据
        :param page_url: 下载页面的URL
        :param html_cont: 下载的网页内容
        :return:
        '''
        if page_url is None or html_cont is None:
            return
        soup = BeautifulSoup(html_cont,'html.parser')
        new_urls = self._get_new_urls(page_url,soup)
        new_data = self._get_new_data(page_url,soup)

        return new_urls,new_data

    def _get_new_urls(self,page_url,soup):
        '''
        抽取新的URL集合
        :param page_url: 下载页面的URL
        :param soup: soup
        :return:
        '''
        new_urls = set()
        #抽取符合要求的a标签
        links = soup.find_all('a',href = re.compile(r'/item/.'))
        for link in links:
            # 提取href属性
            new_url = link['href']
            # 拼接成完整的网址
            new_full_url = parse.urljoin(page_url,new_url)
            new_urls.add(new_full_url)

        return new_urls

    def _get_new_data(self,page_url,soup):
        '''
        抽取有效数据
        :param page_url: 下载页面URL
        :param soup:
        :return: 返回有效数据
        '''
        data = {}
        data['url'] = page_url
        title = soup.find('dd',class_ = 'lemmaWgt-lemmaTitle-title').find('h1')
        data['title'] = title.text
        summary = soup.find('div',class_ = 'lemma-summary')
        #获取tag中包含的所有文本内容
        data['summary'] = summary.text
        return data
# HTML下载器
import requests

class HtmlDownloader():

    def download(self,url):
        if url is None:
            return None
        headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0'.encode('utf-8')}
        r = requests.get(url,headers=headers)
        if r.status_code ==200:
            r.encoding = 'utf-8'
            return r.text
        return None
# 数据存储器
import codecs
import time


class DataOutput():
    def __init__(self):
        self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))
        self.output_head(self.filepath)
        self.datas = []

    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas) > 10:
            self.output_html(self.filepath)

    def output_head(self, path):
        '''
        将HTML头写进去
        :param path:
        :return:
        '''
        fout = codecs.open(path, 'w', encoding='utf-8')
        fout.write('<html>')
        fout.write('<table>')
        fout.write('<table>')
        fout.close()

    def output_html(self, path):
        '''
        将数据写入HTML文件中
        :return:
        '''
        fout = codecs.open(path, 'a', encoding='utf-8')
        for data in self.datas:
            fout.write('<tr>')
            fout.write('<td>%s</td>' % data['url'])
            fout.write('<td>%s</td>' % data['title'])
            fout.write('<td>%s</td>' % data['summary'])
            fout.write('</tr>')
            self.datas.remove(data)
        fout.write('</table>')
        fout.write('</table>')
        fout.write('</html>')
        fout.close()

    def output_end(self, path):
        '''
        将HTML尾写进去
        :param path:
        :return:
        '''
        fout = codecs.open(path, 'a', encoding='utf-8')
        fout.write('</table>')
        fout.write('</table>')
        fout.write('</html>')
        fout.close()
#控制调度器
import random,time,queue
from multiprocessing.managers import BaseManager
from multiprocessing import Process
from .URLManager import UrlManager
from .Data_store import DataOutput


class NodeManager():

    def start_Manager(self,url_q,result_q):
        '''
        创建一个分布式管理器
        :param url_q: url队列
        :param result_q: 结果队列
        :return:
        '''
        BaseManager.register('get_task_queue', callable=lambda: url_q)

        BaseManager.register('get_result_queue', callable=lambda: result_q)

        manager = BaseManager(address=('', 8001), authkey='baike'.encode('utf-8'))

        return manager

    def url_manager_proc(self,url_q,conn_q,root_url):
        url_manager = UrlManager()
        url_manager.add_new_urls(root_url)
        while True:
            while (url_manager.has_new_url()):
                #从URL管理器获取新的URL
                new_url = url_manager.get_new_url()
                # 将新URL发送给工作节点
                url_q.put(new_url)
                print('old_url=',url_manager.old_url_size())
                # 判断,当爬取2000个链接后关闭并保存
                if (url_manager.old_url_size()>2000):
                    url_q.put('end')
                    print('控制节点发起结束通知')
                    # 关闭管理节点。同时存储set状态
                    url_manager.save_progress('new_urls.txt',url_manager.new_urls)
                    url_manager.save_progress('old_urls.txt',url_manager.old_urls)
                    return
            try:
                if not conn_q.empty():
                    urls = conn_q.get()
                    url_manager.add_new_urls(urls)
            except BaseException as e:
                time.sleep(0.1)

    def result_solve_proc(self,result_q,conn_q,store_q):
        while True:
            try:
                if not result_q.empty():
                    content= result_q.get(True)
                    if content['new_urls'] =='end':
                        #结果分析进程接受通知然后结束
                        print('结果分析进程接收通知然后结束')
                        store_q.put('end')
                        return
                    conn_q.put(content['new_urls']) # url为set类型
                    store_q.put(content['data'])#解析出来的数据为dict类型
                else:
                    time.sleep(0.1)
            except BaseException as e:
                time.sleep(0.1)

    def store_proc(self,store_q):
        output = DataOutput()
        while True:
            if not store_q.empty():
                data = store_q.get()
                if data =='end':
                    print('存储进程接受通知然后结束')
                    output.output_end(output.filepath)
                    return
                output.store_data(data)
            else:
                time.sleep(0.1)
if __name__ =='__main__':
    # 初始化4个队列
    url_q = queue.Queue()
    result_q = queue.Queue()
    store_q = queue.Queue()
    conn_q = queue.Queue()
    # 创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(url_q,result_q)
    # 创建URL管理进程,数据提取进程和数据存储进程
    url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm'))
    result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q))
    store_proc = Process(target=node.store_proc,args=(store_q,))
    # 启动3个进程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()

 

转载于:https://www.cnblogs.com/Erick-L/p/7719349.html


http://www.niftyadmin.cn/n/1999913.html

相关文章

c++ map插入key值相同的数据_不想用Object和Array存储数据,你还有Set和Map

全文共4271字&#xff0c;预计学习时长11分钟图源&#xff1a;unsplash许多年来&#xff0c;程序员们一直使用Object和Array来存储数据&#xff0c;这种趋势不仅仅局限于JavaScript。除了这两个选项外&#xff0c;没有其他选择来存储多个值和处理数据结构。然而&#xff0c;在使…

34 败走麦城

34 败走麦城 这是一个侠义英雄的结束。建安二十四年&#xff0c;关羽被杀&#xff0c;刘备集团不但痛失一员猛将&#xff0c;还永远地失去了荆州。那么&#xff0c;一个曾经威震华夏的虎将&#xff0c;为什么会一败涂地&#xff1f;在刘备方面&#xff0c;谁该为关羽的死负责呢…

关于指针传入函数

关于指针&#xff0c;有个经典的互换函数&#xff0c;来解释形参和实参。http://blog.csdn.net/jingzi123456789/article/details/51374807 。 任何东西&#xff0c;传入函数&#xff0c;其实在函数内部&#xff0c;就生成了一份拷贝。普通变量如此&#xff0c;指针也是如此。之…

mysql5.7 双向主从_docker mysql5.7主从复制搭建(双主双从)

1.1 简单说明这里用了两台服务器&#xff0c;分别是161,和163,有条件可以用四台161服务器 &#xff1a;1主1备(mysql-5.7-master-161 mysql-5.7-slave-161)端口&#xff1a;3339 和3340163服务器 &#xff1a;1主1备(mysql-5.7-master-163 mysql-5.7-slave-163)端口&#xff…

mysql4迁移5_(mysql)从digikam 4. *迁移到5. *时的数据库迁移难度

由于我的旧数据库中存在一些不一致,我遇到了同样的问题.修复后(主要是删除一些死引用)进行升级.我似乎还有另一个问题,但可能与db无关.这将显示您的问题(除了根标签外都应为空)&#xff1a;select * from Images where album not in (select Albums.id from Albums);select id,…

35 夷陵之战

35 夷陵之战 刘备占据荆州&#xff0c;却最终被孙权夺了回去&#xff0c;还搭上了左膀右臂——关羽的一条性命。这对于正打算大展鸿图的刘备&#xff0c;无疑是沉重打击。不肯善罢甘休的刘备随后发动了意在夺回荆州的夷陵之战&#xff0c;那么刘备究竟是如何指挥这场战争的呢&…

三维视觉国际会议首度在中国举办

3DV——International Conference on 3 Dimensional Vision&#xff08;三维视觉国际会议&#xff09;&#xff0c;自2013年成立以来&#xff0c;逐渐成为计算机视觉和图形学的三维研究领域中的重量级会议&#xff0c;每年都有来着世界各地的专家学者到会进行交流。去年&#x…

常用mysql引擎及工作原理_了解MySQL存储引擎工作原理

MySql数据库最大的特色就是其插件式的存储引擎架构&#xff0c;本文主要介绍MySql常用的存储引擎&#xff0c;为开发时选择合适的存储引擎提供参考。1. MySql体系结构#在介绍存储引擎之前先来介绍下MySql的体系结构&#xff0c;以便大家知道存储引擎在MySql整个体系中处于什么位…