[算法] 海量日志数据, 提取出某日访问百度次数最多的那个IP

问题: 海量日志数据, 提取出某日访问百度次数最多的那个IP.

首先是这一天, 并且是访问百度的日志中的IP取出来, 逐个写入到一个大文件中. 注意到IP是32位的, 最多有2^32个不同的IP(事实上不可能, 得扣去特殊的IP), 在重复量小的情况下, 内存放不下这些不同的IP. 同样可以采用映射的方法, 比如模1000, 把整个大文件映射为1000个小文件, 再找出每个小文件中出现频率最大的IP(可以采用hash_map进行频率统计, 然后再找出频率最大的几个)及相应的频率. 然后再在这1000个最大的IP中, 找出那个频率最大的IP, 即为所求.

具体做法如下:

  1. 按照IP地址的Hash(IP)%1024值, 把海量IP日志分别存储到1024个小文件中.
  2. 对于每一个小文件, 构建一个以IP为key, 出现次数为value的HashMap, 同时记录当前出现次数最多的那个IP地址;
  3. 得到1024个小文件中的出现次数最多的IP, 再依据常规的排序算法得到总体上出现次数最多的IP.

生成数据

生成1亿个IP地址(这里分两种情况, 一种IP重复率大, 一种IP重复率小), 先生成重复率小的, 写入文件中:

import random
from time import ctime
import gc
def generage_random_IP(file_loc, number):
    with open(file_loc, 'a+') as f:
        # (1) 重复率大, 即使直接遍历也不会超出内存限制
        # IPs = '\n'.join(['255.255.' + str(random.randint(0,255)) + '.' + str(random.randint(0,255)) for i in xrange(number)])
        # (2) 重复率小, 会真正导致后面大数据
        IPs = '\n'.join(['.'.join([str(random.randint(0,255)) for i in range(4)]) for i in xrange(number)])
        f.write(IPs)
    gc.collect()
    IPs = None

if __name__ == '__main__':
    print(ctime())
    for i in xrange(50):
        print('  ' + str(i) + ": " + ctime())
        generage_random_IP('IPs100000000.txt', 2000000) # 50 * 2000000 根据内存限制调整
    print(ctime())

测试了下, 时间基本上是花在生成随机数上面. 大概需要十几分钟.

分割文件

from time import ctime
def devide_IPs(file_loc):
    IPs = {}
    i = 0
    with open(file_loc, 'rb') as f:
        for line in f:
            i += 1
            key = line.split('.')[2]
            # key = str(hash(line) % 1024)  # 这里似乎是求余花的时间比较多, hash花的时间不比split多
            if key not in IPs:
                IPs[key] = [line]
            else:
                IPs[key].append(line)
            if i >= 100000:
                for k, v in IPs.items():
                    with open('ips' + k + '.txt', 'a+') as f2:
                        f2.writelines(v)
                        del IPs[k][:]
                i = 0

if __name__ == '__main__':
    print(ctime())
    devide_IPs('IPs100000000.txt')
    print(ctime())

200513318 function calls (200513315 primitive calls) in 348.061 seconds

大概需要6、7分钟

教训: 不要乱添加打印语句, 时间都耗在上面了. 刚开始每读一行都输出, 运行了一个多小时还没完...

计算IP数, 将每个文件中IP重复最多的IP的列表写入文件.

import pickle
from time import ctime
def count_IPs(file_loc):
    IPs = {}
    try:
        with open(file_loc, 'rb') as f:
            for line in f:
                if line in IPs:
                    IPs[line] += 1
                else:
                    IPs[line] = 0
                # IPs[line] = IPs.get(line, 0) + 1
        max_ip, max_value = max(IPs.iteritems(), key=lambda x:x[1])  # 这句比较耗时
        return {max_ip: max_value}
    except:
        pass

if __name__ == '__main__':
    print(ctime())
    max_IPs = []
    for i in range(256):
        max_IP = count_IPs('ips' + str(i) + '.txt')
        if max_IP is not None:
            max_IPs.append(max_IP)
    print(ctime())
    with open('count_IPs', 'w') as f:
        pickle.dump(max_IPs, f)

98757714 function calls (98756943 primitive calls) in 179.475 seconds

大概需要3分钟

import pickle
from time import ctime
def get_max_IP():
    with open('count_IPs', 'r') as f:
        max_IPs = pickle.load(f)
    # print(max_IPs)
    print(max(max_IPs, key=lambda x:x.values()))

if __name__ == '__main__':
    print(ctime())
    get_max_IP()
    print(ctime())

11209 function calls (11206 primitive calls) in 0.016 seconds

大概需要0.01s左右

上面是IP重复率小的情况, 另外一种情况, IP的重复率比较大的情况下, 即使直接遍历文件, 内存占用也不会超过128M, 并且节省了文件分割的时间. 用时大概1分半分钟.

def count_IPs_once(file_loc):
    IPs = {}
    with open(file_loc, 'rb') as f:
        for line in f:
            if line in IPs:
                IPs[line] += 1
            else:
                IPs[line] = 0
    print(max(IPs.iteritems(), key=lambda x:x[1]))

if __name__ == '__main__':
    print(ctime())
    count_IPs_once('IPs100000000.txt')  # 注意: IP重复量小的情况下, 内存会超过限制
    print(ctime())

66773 function calls (66770 primitive calls) in 91.358 seconds

总结: 大数据与文件的大小其实关系不大, 真正有关系的是计数时存入字典的IP量, 如果不重复的IP很多的话, 这个数据占用的内存就会持续增加. 如果IP重复量大, 则并不会占用太多内存.

参考: Python处理海量数据的实战研究