您的位置: 网站首页> SEO工具> 当前文章

python查询百度PC域名首页覆盖率

老董-我爱我家房产SEO2020-08-24181围观,148赞

  判断一个行业哪些站做的好(流量大)的做法

  1、找一批行业词库,抓取每个词百度排名前10的url。假设1万个词那么得到10万个url;

  2、从10万个url中提取域名,统计下各个域名出现次数;

  3、某域名的出现次数/10万 来计算各域名首页覆盖率;

  4、覆盖率高的则为高流量站点。

  【ps:如果长期记录统计,观察发现哪些站降权,哪些站暴涨】。

  百度反爬提示

  会封UA、封cookie、封ip比较少见。线程数默认是1,现在百度反爬比之前严重!线程最好是1。【如果多线程写同一个文件需要加锁否则可能数据错乱】

  最恶心的是爬虫得来的页面和实际搜索的页面不同(原因如下)!

  因为直接拼接搜索urL进行访问,没有用户真实的鼠标动作无法触发gif请求!所以容易被识别。拼接url访问返回正常的页面但是页面内容和实际搜索不同,如果不加cookie但是有鼠标的动作则不会被反爬。

  脚本功能(集域名覆盖率+目标域名首页词词数为一体):

  1)指定几个域名,分关键词种类监控首页词数

  2)抓取serp所有url,提取域名并统计各域名首页覆盖率

   脚本规则:

  1)含自然排名和百度开放平台的排名

  2)百度开放平台的排名样式mu属性值为排名url,mu不存在提取article里的url

  3)kwd_core_city.xlsx:sheet名为关键词种类,sheet第一列放关键词

  脚本结果:

  bdpc1_index_info.txt:各监控站点词的排名及url,如有2个url排名,只取第一个

  bdpc1_index_all.txt:serp所有url及样式特征,依此统计各域名首页覆盖率-单写脚本(bdpc1_tj.py)完成

  bdpc1_index.xlsx:自己站每类词首页词数

  bdpc1_index_domains.xlsx:各监控站点每类词的首页词数

  bdpc1_index_domains.txt:各监控站点每类词的首页词数

  cookie必须是登录baidu账号后的cookie否则很容易被反爬

# ‐*‐ coding: utf‐8 ‐*‐
"""
功能:
   1)指定几个域名,分关键词种类监控首页词数
   2)抓取serp所有url,提取域名并统计各域名首页覆盖率
提示:
  1)含自然排名和百度开放平台的排名
  2)百度开放平台的样式mu属性值为排名url,mu不存在提取article里的url
  3)kwd_core_city.xlsx:sheet名为关键词种类,sheet第一列放关键词
结果:
    bdpc1_index_info.txt:各监控站点词的排名及url,如有2个url排名,只取第一个
    bdpc1_index_all.txt:serp所有url及样式特征,依此统计各域名首页覆盖率-单写脚本统计
    bdpc1_index.xlsx:自己站每类词首页词数
    bdpc1_index_domains.xlsx:各监控站点每类词的首页词数
    bdpc1_index_domains.txt:各监控站点每类词的首页词数

"""

import requests
from pyquery import PyQuery as pq
import threading
import queue
import time
from urllib.parse import urlparse
from openpyxl import load_workbook
from openpyxl import Workbook
import time
import gc
import random


# 计算最终结果
def get_result(file_path, result):
    for line in open(file_path, 'r', encoding='utf-8'):
        line = line.strip().split('	')
        rank = line[2]
        group = line[3]
        domain = line[4]
        if rank != '无':
            result[domain][group]['首页'] += 1
        result[domain][group]['总词数'] += 1
    return result

# 写txt,所有监控域名的结果
def write_domains_txt(result_last):
    with open('{0}bdpc1_index_domains.txt'.format(today), 'w', encoding="utf-8") as f_res:
        f_res.write('{0}	{1}	{2}	{3}	{4}
'.format('日期','域名','词类','首页词数','查询词数'))
        for now_domain,dict_value in result_last.items():
            for group, dict_index_all in dict_value.items():
                f_res.write('{0}	{1}	{2}	'.format(today,now_domain,group))
                for key, value in dict_index_all.items():
                    f_res.write(str(value) + '	')
                f_res.write('
')


# 写excel
def write_myexcel(group_list, result_last, today,my_domain):
    wb = Workbook()
    wb_all = Workbook()
    # 创建sheet写表头
    for group in group_list:
        sheet_num = 0
        wb.create_sheet(u'{0}'.format(group), index=sheet_num)
        wb_all.create_sheet(u'{0}'.format(group), index=sheet_num)
        row_first = ['日期', '首页', '总词数']
        row_first2 = ['日期', '域名','首页', '总词数']
        # 写表头
        wb[group].append(row_first)
        wb_all[group].append(row_first2)
        sheet_num += 1
    # 写内容
    for domain, dict_value in result_last.items():
        if domain == my_domain:
            for group, dict_index_all in dict_value.items():
                # 写数据
                row_value = [today]
                for key,value in dict_index_all.items():
                    row_value.append(value)
                wb[u'{0}'.format(group)].append(row_value)

        for group, dict_index_all in dict_value.items():
            # 写数据
            row_value = [today,domain]
            for key, value in dict_index_all.items():
                row_value.append(value)
            wb_all[u'{0}'.format(group)].append(row_value)
    wb.save('{0}bdpc1_index.xlsx'.format(today))
    wb_all.save('{0}bdpc1_index_domains.xlsx'.format(today))

# 发js包-不用
def request_js(url,my_header,retry=1):
    try:
        r = requests.get(url=url,headers=my_header,timeout=2)
    except Exception as e:
        print('获取源码失败',e)
        time.sleep(6)
        if retry > 0:
            request_js(url,retry-1)
    else:
        pass

# 随机header
def get_header():
    my_header = {
       'Accept':'*/*',
'Accept-Encoding':'deflate',
'Accept-Language':'zh-CN',
'Connection':'keep-alive',
'Cookie':'BIDUPSID=F1CF7AB3FC2DA6ECCFEA6C42531C411B; PSTM=1581827129; BAIDUID=F1CF7AB3FC2DA6ECE1FACA537C8B3FAC:FG=1; BD_UPN=17314753; BDORZ=FFFB88E999055A3F8A630C64834BD6D0; H_PS_PSSID=30747_1456_21115; BDUSS=1WdUtBeVNqNH5vS2VaYXQ5UHJGQmFEMXg5dHdRNG1NZG1ZeHFLZkJDRGoxWFplRVFBQUFBJCQAAAAAAAAAAAEAAADMUmcv0PjFq7jx1fPOuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAONIT17jSE9eZm; delPer=0; BD_CK_SAM=1; PSINO=1; COOKIE_SESSION=239_0_6_5_2_8_0_0_6_2_1_0_295888_0_0_0_1582123020_0_1582254942%7C8%230_2_1582122959%7C1; H_PS_645EC=8f0ehuUMt5Lm6qtroHxMDGgtzbm4tJ7LdVJ2bgmnbQld2bS8ihlqacGtUMGPWw; BDSVRTM=0; WWW_ST=1582255020946',
'DNT':'1',
'Host':'www.baidu.com',
'is_pbs':'cookie%E7%94%9F%E6%88%90%E6%9C%BA%E5%88%B6',
'is_referer':'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=1&tn=myie2dg&wd=cookie%E7%94%9F%E6%88%90%E6%9C%BA%E5%88%B6&oq=cookie%25E4%25BC%259A%25E6%25A0%25B9%25E6%258D%25AEua%25E6%259D%25A5%25E7%2594%259F%25E6%2588%2590%25E5%2590%2597&rsv_pq=e10693e000e4bf54&rsv_t=ee7dSf42B4MCR7cw0%2Fd2EhBKPH2Fjpo%2F51RTpiEA0twnowkIZ%2FBbBWcEDsTbmw&rqlang=cn&rsv_enter=0&rsv_dl=tb&inputT=2578&rsv_sug3=26&rsv_sug1=13&rsv_sug7=100&rsv_sug2=0&rsv_sug4=3358&bs=cookie%E4%BC%9A%E6%A0%B9%E6%8D%AEua%E6%9D%A5%E7%94%9F%E6%88%90%E5%90%97',
'is_xhr':'1',
'Referer':'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=1&tn=myie2dg&wd=cookie%E7%94%9F%E6%88%90%E6%9C%BA%E5%88%B6&oq=cookie%25E7%2594%259F%25E6%2588%2590%25E6%259C%25BA%25E5%2588%25B6&rsv_pq=ab6d996300d5ab56&rsv_t=8f0ehuUMt5Lm6qtroHxMDGgtzbm4tJ7LdVJ2bgmnbQld2bS8ihlqacGtUMGPWw&rqlang=cn&rsv_enter=0&rsv_dl=tb',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36 Maxthon/5.3.8.2000',
'X-Requested-With': 'XMLHttpRequest',}
    return my_header


class bdpcIndexMonitor(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    @staticmethod
    def read_excel(filepath):
        q = queue.Queue()
        group_list = []
        kwd_dict = {}
        wb_kwd = load_workbook(filepath)
        for sheet_obj in wb_kwd:
            sheet_name = sheet_obj.title
            group_list.append(sheet_name)
            kwd_dict[sheet_name]= []
            col_a = sheet_obj['A']
            for cell in col_a:
                kwd = (cell.value)
                # 加个判断吧
                if kwd:
                    q.put([sheet_name,kwd])
        return q, group_list

    # 初始化结果字典
    @staticmethod
    def result_init(group_list):
        result = {}
        for domain in domains:
            result[domain] = {}
            for group in group_list:
                result[domain][group] = {'首页':0,'总词数':0}
        print("结果字典init...")
        return result


    # 获取某词serp源码
    def get_html(self,url,my_header,retry=1):
        try:
            r = requests.get(url=url,headers=my_header,timeout=5)
        except Exception as e:
            print('获取源码失败',e)
            time.sleep(6)
            if retry > 0:
                self.get_html(url,my_header,retry-1)
        else:
            html = r.content.decode('utf-8',errors='ignore')  # 用r.text有时候识别错误
            url = r.url  # 反爬会重定向,取定向后的地址
            return html,url

    # 获取某词serp源码上自然排名的所有url
    def get_encrpt_urls(self,html,url):
        encrypt_url_list = []
        real_urls = []
        doc = pq(html)
        title = doc('title').text()
        if '_百度搜索' in title and 'https://www.baidu.com/s?ie=utf-8' in url:
            div_list = doc('.result').items() # 自然排名
            div_op_list = doc('.result-op').items() # 非自然排名
            for div in div_list:
                rank = div.attr('id')
                if rank:
                    try:
                        a = div('h3 a')
                    except Exception as e:
                        print('未提取自然排名加密链接')
                    else:
                        encrypt_url = a.attr('href')
                        encrypt_url_list.append((encrypt_url,rank))
            for div in div_op_list:
                rank_op = div.attr('id')
                if rank_op:
                    link = div.attr('mu') # 真实url,有些op样式没有mu属性
                    # print(link,rank_op)
                    if link: 
                        real_urls.append((link,rank_op))
                    else:
                        encrypt_url = div('article a').attr('href')
                        encrypt_url_list.append((encrypt_url,rank_op))

        else:
            print('源码异常,可能反爬')
            print(html)
            time.sleep(60)
                    
        return encrypt_url_list,real_urls

    # 解密某条加密url
    def decrypt_url(self,encrypt_url,my_header,retry=1):
        real_url = None # 默认None
        try:
            encrypt_url = encrypt_url.replace('http://','https://')
            r = requests.head(encrypt_url,headers=my_header)
        except Exception as e:
            print(encrypt_url,'解密失败',e)
            time.sleep(6)
            if retry > 0:
                self.decrypt_url(encrypt_url,my_header,retry-1)
        else:
            real_url = r.headers['Location'] if 'Location' in r.headers else None
        return real_url

    # 获取某词serp源码首页排名真实url
    def get_real_urls(self,encrypt_url_list):
        real_url_list = [self.decrypt_url(encrypt_url) for encrypt_url in encrypt_url_list]
        real_url_set = set(real_url_list)
        real_url_set = real_url_set.remove(None) if None in real_url_set else real_url_set
        real_url_list = list(real_url_set)
        return real_url_list

    # 提取某url的域名部分
    def get_domain(self,real_url):
        domain = None
        try:
           res = urlparse(real_url)
        except Exception as e:
           print (e,real_url)
        else:
           domain = res.netloc
        return domain

    # 获取某词serp源码首页排名所有域名
    def get_domains(self,real_url_list):
            domain_list = [self.get_domain(real_url) for real_url in real_url_list]
            # 一个词某域名多个url有排名,计算一次
            domain_set = set(domain_list)
            domain_set = domain_set.remove(None) if None in domain_set else domain_set
            domain_str = ','.join(domain_set)
            return domain_str

    # 线程函数
    def run(self):
        while 1:
            group_kwd = q.get()
            group,kwd = group_kwd
            print(group,kwd)
            try:
                url = "https://www.baidu.com/s?ie=utf-8&rsv_bp=1&tn=87048150_dg&wd={0}".format(kwd)
                my_header = get_header()
                html,now_url = self.get_html(url,my_header)
                encrypt_url_list_rank,real_urls_rank = self.get_encrpt_urls(html,now_url)
                # 源码ok再写入
                if encrypt_url_list_rank:
                    for my_serp_url,my_order in encrypt_url_list_rank:
                        my_real_url = self.decrypt_url(my_serp_url,my_header)
                        real_urls_rank.append((my_real_url,my_order))
                    real_urls = []
                    for my_real_url,my_order in real_urls_rank:
                        real_urls.append(my_real_url)
                        f_all.write('{0}	{1}	{2}	{3}
'.format(kwd,my_real_url,my_order,group))
                    domain_str = self.get_domains(real_urls)
                    # 目标站点是否出现
                    for domain in domains:
                        if domain not in domain_str:
                            f.write('{0}	{1}	{2}	{3}	{4}
'.format(kwd, '无', '无', group,domain))
                        else:
                            for my_url, my_order in real_urls_rank:
                                if domain in my_url:
                                    f.write('{0}	{1}	{2}	{3}	{4}
'.format(kwd, my_url, my_order, group,domain))
                                    # print(my_url, my_order)
                                    break # 取第一个排名url
                f.flush()
                f_all.flush()
            except Exception as e:
                print(e)
            finally:
                del kwd
                gc.collect()
                q.task_done()


if __name__ == "__main__":
    start = time.time()
    local_time = time.localtime()
    today = time.strftime('%Y%m%d',local_time)
    domains = ['5i5j.com','lianjia.com','anjuke.com','fang.com'] # 目标域名
    my_domain = '5i5j.com'
    q,group_list = bdpcIndexMonitor.read_excel('2020kwd_url_core_city_unique.xlsx')  # 关键词队列及分类
    result = bdpcIndexMonitor.result_init(group_list)  # 结果字典
    # print(result)
    all_num = q.qsize() # 总词数
    f = open('{0}bdpc1_index_info.txt'.format(today),'w',encoding="utf-8")
    f_all = open('{0}bdpc1_index_all.txt'.format(today),'w',encoding="utf-8")
    file_path = f.name
    # 设置线程数
    for i in list(range(1)):
        t = bdpcIndexMonitor()
        t.setDaemon(True)
        t.start()
    q.join()
    f.close()
    f_all.close()
    # 根据bdpc1_index_info.txt计算结果
    result_last = get_result(file_path,result)
    # 写入txt文件
    write_domains_txt(result_last)
    # 写入excel
    write_myexcel(group_list,result_last,today,my_domain)
    end = time.time()
    print('关键词共{0}个,耗时{1}min'.format(all_num, (end - start) / 60))

 

  测试7400左右的{位置}+租房/二手房的词测试结果前20(这类词比较特别,行业大站用了很多子域名分城市做,可提取顶级域名进一步查看)

xxx年xx月xx日  
域名 占比
www.anjuke.com 6.02%
m.anjuke.com 3.81%
m.fang.com 3.05%
www.fang.com 3.00%
shanghai.anjuke.com 2.73%
bj.lianjia.com 1.97%
map.baidu.com 1.65%
beijing.anjuke.com 1.62%
bj.58.com 1.50%
sh.lianjia.com 1.47%
sh.58.com 1.43%
bj.5i5j.com 1.42%
fangjia.fang.com 1.29%
esf.fang.com 1.25%
zu.fang.com 1.21%
wh.58.com 1.11%
www.youtx.com 1.10%
www.qk365.com 1.05%
tj.5i5j.com 1.05%

  附bdpc1_tj.py的脚本

# ‐*‐ coding: utf‐8 ‐*‐
"""
根据bdpc1_index_all.txt数据统计域名覆盖率
sigma.baidu.com:xx_相关网站|xx_相关企业
recommend_list.baidu.com:其他人还在搜
nourl.ubs.baidu.com:搜索智能聚合
bzclk.baidu.com:结构化的展示样式
"""

import requests
from pyquery import PyQuery as pq
import threading
import queue
import time
from urllib.parse import urlparse
import gc
import json
from openpyxl import load_workbook
from openpyxl import Workbook


# 提取某条url域名部分
def get_domain(real_url):

    # 通过mu提取url有些非自然排名url是空
    try:
       res = urlparse(real_url)  # real_url为空不会报错
    except Exception as e:
       print (e,real_url)
       domain = "xxx"
    else:
       domain = res.netloc
    return domain


# 读取文件获取关键词类别
def read_excel(filepath):
    city_list = []
    wb_kwd = load_workbook(filepath)
    for sheet_obj in wb_kwd:
        sheet_name = sheet_obj.title
        city_list.append(sheet_name)
    return city_list

# 初始化结果字典
def result_init(group_list):
    result = {}
    for group in group_list:
        result[group] = {}
    print("结果字典init...")
    return result

def save():
    res_format = result.items()
    #写入excel文件
    wb = Workbook()
    # 创建sheet
    for city in city_list:
        sheet_num = 0
        wb.create_sheet(u'{0}'.format(city),index=sheet_num)
        sheet_num += 1
    for city,data_dict in res_format:
        sort_dict = sorted(data_dict.items(), key=lambda s: s[1], reverse=True)
        for domain,num in sort_dict:
            row_value = [domain,num]
            wb[u'{0}'.format(city)].append(row_value)
    wb.save('{0}bdpc1_index_cover.xlsx'.format(today))

    # 写入txt
    res_format = sorted(result_all.items(), key=lambda s: s[1], reverse=True)
    with open('{0}bdpc1_domain_res.txt'.format(today),'w',encoding='utf-8') as f:
        for domain,num in res_format:
            f.write(domain+'	'+str(num)+'	' + str('{:.2%}'.format(num/count)) + '
')


if __name__ == "__main__":
    start = time.time()
    local_time = time.localtime()
    today = time.strftime('%Y%m%d', local_time)
    today = '20200215'
    city_list = read_excel('2020kwd_core_city.xlsx')
    result = result_init(city_list)  # 初始化结果字典
    result_all={} # 不区分城市统计

    #文件比较大统计行数
    count=-1
    for count, line in enumerate(open('{0}bdpc1_index_all.txt'.format(today),'r',encoding='utf-8')):
        count+=1
    print(count)

    # 统计每个域名出现了多少次
    for i in open('{0}bdpc1_index_all.txt'.format(today),'r',encoding='utf-8'):
        i = i.strip()
        line = i.split('	')
        url = line[1]
        city = line[3]
        if url.startswith('http'):
            domain = get_domain(url)
            result[city][domain] = result[city][domain]+1 if domain in result[city] else 1
            result_all[domain] = result_all[domain]+1 if domain in result_all else 1
        else:
            print(url)
    # 结果保存文件
    save()

    end = time.time()

很赞哦!

python编程网提示:转载请注明来源www.python66.com。
有宝贵意见可添加站长微信(底部),获取技术资料请到公众号(底部)。同行交流请加群 python学习会

文章评论

    python查询百度PC域名首页覆盖率文章写得不错,值得赞赏

站点信息

  • 网站程序:Laravel
  • 客服微信:a772483200