Python编写一个优美的下载器

yipeiwu_com5年前Python基础

本文实例为大家分享了Python编写下载器的具体代码,供大家参考,具体内容如下

#!/bin/python3 
# author: lidawei 
# create: 2016-07-11 
# version: 1.0 
# 功能说明: 
#  从指定的URL将文件取回本地 
##################################################### 
 
import http.client 
import os 
import threading 
import time 
import logging 
import unittest 
from queue import Queue 
from urllib.parse import urlparse 
 
logging.basicConfig(level = logging.DEBUG, 
     format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 
     datefmt = '%a, %d %b %Y %H:%M:%S', 
     filename = 'Downloader_%s.log' % (time.strftime('%Y-%m-%d')), 
     filemode = 'a') 
 
class Downloader(object): 
 '''''文件下载器''' 
 url = '' 
 filename = '' 
 
 def __init__(self, full_url_str, filename): 
  '''''初始化''' 
  self.url = urlparse(full_url_str) 
  self.filename = filename 
 
 def download(self): 
  '''''执行下载,返回True或False''' 
  if self.url == '' or self.url == None or self.filename == '' or self.filename == None: 
   logging.error('Invalid parameter for Downloader') 
   return False 
 
  successed = False 
  conn = None 
  if self.url.scheme == 'https': 
   conn = http.client.HTTPSConnection(self.url.netloc) 
  else: 
   conn = http.client.HTTPConnection(self.url.netloc) 
  conn.request('GET', self.url.path) 
  response = conn.getresponse() 
  if response.status == 200: 
   total_size = response.getheader('Content-Length') 
   total_size = (int)(total_size) 
   if total_size > 0: 
    finished_size = 0 
    file = open(self.filename, 'wb') 
    if file: 
     progress = Progress() 
     progress.start() 
     while not response.closed: 
      buffers = response.read(1024) 
      file.write(buffers) 
 
      finished_size += len(buffers) 
      progress.update(finished_size, total_size) 
      if finished_size >= total_size: 
       break 
     # ... end while statment 
     file.close() 
     progress.stop() 
     progress.join() 
    else: 
     logging.error('Create local file %s failed' % (self.filename)) 
    # ... end if statment 
   else: 
    logging.error('Request file %s size failed' % (self.filename)) 
   # ... end if statment 
  else: 
   logging.error('HTTP/HTTPS request failed, status code:%d' % (response.status)) 
  # ... end if statment 
  conn.close() 
 
  return successed 
 # ... end download() method 
# ... end Downloader class 
 
class DataWriter(threading.Thread): 
 filename = '' 
 data_dict = {'offset' : 0, 'buffers_byte' : b''} 
 queue = Queue(128) 
 __stop = False 
 
 def __init__(self, filename): 
  self.filename = filename 
  threading.Thread.__init__(self) 
 
 #Override 
 def run(self): 
  while not self.__stop: 
   self.queue.get(True, 1) 
 
 def put_data(data_dict): 
  '''''将data_dict的数据放入队列,data_dict是一个字典,有两个元素:offset是偏移量,buffers_byte是二进制字节串''' 
  self.queue.put(data_dict) 
 
 def stop(self): 
  self.__stop = True 
 
class Progress(threading.Thread): 
 interval = 1 
 total_size = 0 
 finished_size = 0 
 old_size = 0 
 __stop = False 
 
 def __init__(self, interval = 0.5): 
  self.interval = interval 
  threading.Thread.__init__(self) 
 
 #Override 
 def run(self): 
  # logging.info('  Total  Finished  Percent  Speed') 
  print('  Total  Finished  Percent  Speed') 
  while not self.__stop: 
   time.sleep(self.interval) 
   if self.total_size > 0: 
    percent = self.finished_size / self.total_size * 100 
    speed = (self.finished_size - self.old_size) / self.interval 
    msg = '%12d %12d %10.2f%% %12d' % (self.total_size, self.finished_size, percent, speed) 
    # logging.info(msg) 
    print(msg) 
 
    self.old_size = self.finished_size 
   else: 
    logging.error('Total size is zero') 
  # ... end while statment 
 # ... end run() method 
 
 def stop(self): 
  self.__stop = True 
 
 def update(self, finished_size, total_size): 
  self.finished_size = finished_size 
  self.total_size = total_size 
 
class TestDownloaderFunctions(unittest.TestCase): 
 
 def setUp(self): 
  print('setUp') 
 
 def test_download(self): 
  url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe' 
  filename = 'QQ8.4.exe' 
  dl = Downloader(url, filename) 
  dl.download() 
 
 def tearDown(self): 
  print('tearDown') 
 
if __name__ == '__main__': 
 unittest.main() 

这是测试结果:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

python使用wmi模块获取windows下硬盘信息的方法

本文实例讲述了python使用wmi模块获取windows下硬盘信息的方法。分享给大家供大家参考。具体实现方法如下: # -*- coding: utf-8 -*- #import...

Python使用pymysql模块操作mysql增删改查实例分析

本文实例讲述了Python使用pymysql模块操作mysql增删改查。分享给大家供大家参考,具体如下: # -*- coding:utf-8 -*- import pymysql...

Python中字典(dict)和列表(list)的排序方法实例

一、对列表(list)进行排序 推荐的排序方式是使用内建的sort()方法,速度最快而且属于稳定排序复制代码 代码如下:>>> a = [1,9,3,7,2,0,5]&...

Python 实现 贪吃蛇大作战 代码分享

Python 实现 贪吃蛇大作战 代码分享

感觉游戏审核新政实施后,国内手游市场略冷清,是不是各家的新游戏都在排队等审核。媒体们除了之前竞相追捧《Pokemon Go》热闹了一把,似乎也听不到什么声音了。直到最近几天,突然听见好...

python基于pyDes库实现des加密的方法

本文实例讲述了python基于pyDes库实现des加密的方法。分享给大家供大家参考,具体如下: 下载及简介地址:https://twhiteman.netfirms.com/des.h...