文档章节

Selenium2 Python 自动化测试实战学习笔记(八)

henni_719
 henni_719
发布于 2017/04/22 17:15
字数 3253
阅读 6
收藏 0

Python 多线程

分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。


10.1 单进程的时代


         在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序来执行任务。假如我们创建了两个任务,听音乐(music)和看电影(move),在单线程中我们只能按先后顺序来执行这两个任务。

Onethread.py

#coding=utf-8
from time import sleep, ctime
 
def music():
   print 'I was listening to music! %s' %ctime()
   sleep(2)
 
def move():
   print 'I was at the movies! %s' %ctime()
   sleep(5)
   
if __name__ == '__main__':
   music()
   move()
print 'allend:', ctime()

         别创建了两个任务music 和move,执行music 需要2 秒,执行move 需要5 秒,通过sleep()

方法设置休眠时间来模拟任务的运行时间。

    给music() 和move()两个方法设置参数,用于接收要播放的歌曲和视频,通过for 循环控制播放的次数,分别让歌曲和电影播放了2 次:onethread_with_pri.py:

#coding=utf-8
from time import sleep, ctime
 
def music(func):
    for i in range(2):
        print 'I was listening to music %s! %s'%(func,ctime())
        sleep(2)
 
def move(func):
     for i in range(2):
        print 'I was at the movies %s! %s'%(func,ctime())
        sleep(5)
   
if __name__ == '__main__':
    music(u'爱情买卖')
    move(u'阿凡达')
print 'all end:', ctime()


10.2 多线程技术


         Python 通过两个标准库thread和threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于Java 的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python 中则是独立的对象。


   10.2.1 threading 模块


         避免使用thread 模块,因为是它不支持守护线程。当主线程退出时,所有的子线程不论它

们是否还在工作,都会被强行退出。threading模块则支持守护线程。Threads.py:

#coding=utf-8
from time import sleep, ctime
import threading
 
def music(func):
   for i in range(2):
        print 'I was listening to music %s! %s'%(func,ctime())
        sleep(2)
 
def move(func):
    for i in range(2):
        print 'I was at the movies %s! %s'%(func,ctime())
        sleep(5)
 
threads=[]
t1=threading.Thread(target=music,args=(u'爱情买卖',))
threads.append(t1)
t2=threading.Thread(target=move,args=(u'阿凡达',))
threads.append(t2)
   
if __name__ == '__main__':
   for i in threads:
        i.start()
   #keep thread
   for i in threads:
        i.join()
       
print 'all end:', ctime()

import threading 引入线程模块。

threads = [] 创建线程数组,用于装载线程。

threading.Thread()通过调用threading模块的Thread()方法来创建线程。

start() 开始线程活动。

join() 等待线程终止。

通过for 循环遍历thread 数组中所装载的线程;然后通过start()函数启动每一个线程。

join()会等到线程结束,或者在给了timeout 参数的时候,等到超时为止。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。


  10.2.2 优化线程的创建

         从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),如果创建的线程较多时这样极其不方便。Player.py

#coding=utf-8
from time import sleep, ctime
import threading
 
def music(func):
    for i in range(2):
        print 'I was listening to music %s! %s'%(func,ctime())
        sleep(2)
 
def move(func):
     for i in range(2):
        print 'I was at the movies %s! %s'%(func,ctime())
        sleep(5)
def player(name):
    r=name.split(".")[1]
    if r=="mp3" orr=="MP3":
        music(name)
    elif r=="mp4" or  r=="MP4":
        move(name)
    else:
        print "Error:the format is notrecognized!"
 
list=["the sale oflove.mp3","love.MP3","a fan da.mp4","the sale oflove.MP4"]
threads=[]
files=range(len(list))
for i in files:
   t=threading.Thread(target=player,args=(list[i],))
    threads.append(t)
   
if __name__ == '__main__':
    for i in files:
        threads[i].start()
 
    #keep thread
    for i in files:
        threads[i].join()
       
print 'all end:', ctime()

    创建了一个player()函数,这个函数用于判断播放文件的类型。如果是mp3 格式的,

我们将调用music()函数,如果是mp4 格式的我们调用move()函数。哪果两种格式都不是那么只能告诉用户你所提供有文件我播放不了。

         然后创建了一个list 的文件列表,注意为文件加上后缀名。然后我们用len(list) 来计算list列表有多少个文件,确定循环次数。

    接着通过一个for 循环,把list 中的文件添加到线程中数组threads[]中。接着启动threads[]线程组,最后打印结束时间。

现在向list 数组中添加一个文件,程序运行时会自动为其创建一个线程。

 

通过上面的程序,我们发现player()用于判断文件扩展名,然后调用music()和move() ,其实,music()和move()完整工作是相同的:Super_player.py

#coding=utf-8
from time import sleep, ctime
import threading
 
def super_player(name,time):
    for i in range(2):
        print 'Start playing...%s!%s' %(file,ctime())
        sleep(time)
   
 
dic={"love.mp3":3,"love.rmvb":103,"love.mp4":65}
threads=[]
files=range(len(dic))
for file,time in dic.items():
   t=threading.Thread(target=super_player,args=(file,time))
    threads.append(t)
   
if __name__ == '__main__':
    for i in files:
        threads[i].start()
 
    #keep thread
    for i in files:
        threads[i].join()
       
print 'all end:', ctime()

首先创建字典list ,用于定义要播放的文件及时长(秒),通过字典的items()方法来循环的取file和time,取到的这两个值用于创建线程。接着创建super_player()函数,用于接收file 和time,用于确定要播放的文件及时长。最后是线程启动运行。

  10.2.3 创建线程类

                   创建自己的线程类:mythread.py

#coding=utf-8
import threading
from time import sleep,ctime
 
class MyThread(threading.Thread):
   def __init__(self,func,args,name=" "):
       threading.Thread.__init__(self)
       self.name=name
       self.func=func
       self.args=args
 
   def run(self):
       apply(self.func,self.args)
 
 
def super_player(file,time):
   for i in range(2):
       print 'Starting playing:%s!\t%s\n' %(file,ctime())
       sleep(time)
dic={"the sale oflove.mp3":3,"a fan da.mp4":6}
 
threads=[]
files=range(len(dic))
 
for file,time in dic.items():
   t=MyThread(super_player,(file,time),super_player.__name__)
   threads.append(t)
 
if __name__ == '__main__':
   for i in files:
       threads[i].start()
   for i in files:
       threads[i].join()
print 'end:%s' %ctime()

 

 

MyThread(threading.Thread)

创建MyThread 类,用于继承threading.Thread 类。

__init__() 类的初始化方法对func、args、name 等参数进行初始化。

apply() 当函数参数已经存在于一个元组或字典中时,间接地调用函数。args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。

10.3 多进程技术

  10.3.1 mutiprocessing 模块

         multiprocessing 提供了本地和远程的并发性,有效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。。由于GIL 的存在,在CPU 密集型的程序当中,使用多线程并不能有效地利用多核CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。

multiprocessing 模块的Process实现了多进程。process.py:

#coding=utf-8
from timeimport sleep, ctime
import multiprocessing
 
def super_player(f,time):
    for i in range(2):
        print 'Start playing...%s! %s'%(f,ctime())
        sleep(time)
   
dic={"love.mp3":3,"love.rmvb":4,"love.mp4":5}
process=[]
files=range(len(dic))
for f,time indic.items():
    t=multiprocessing.Process(target=super_player,args=(f,time))
    process.append(t)
   
if __name__ =='__main__':
    for i in files:
        process[i].start()
    #keep thread
    for i in files:
        process[i].join()
       
    print 'all end:%s'%ctime()


multiprocessing.Process 对象来创建一个进程。Process对象与Thread 对象的用法相同,也有start(),run(), join()的方法。multiprocessing.Process(group=None,target=None, name=None, args=(), kwargs={})target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。Group 实质上不使用。多程的结果显示是以进程

组为顺序进行显示的。在*nix 上面创建的新的进程使用的是fork

  10.3.2 Pipe 和 Queue

         multiprocessing 包中有Pipe 类和Queue 类来分别支持这两种IPC 机制。Pipe 和Queue 可以用来传送常见的对象。

    (1)Pipe可以是单向(half-duplex),也可以是双向(duplex),通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从PIPE 一端输入对象,然后被PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。pipe.py: 注:本程序只能在linux/Unix上运行

#coding=utf-8
import multiprocessing
def proc1(pipe):
   pipe.send('hello')
   print('proc1 rec:',pipe.recv())
def proc2(pipe):
   print('proc2 rec:',pipe.recv())
   pipe.send('hello, too')
   
pipe = multiprocessing.Pipe()
p1 =multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 =multiprocessing.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()

         这里的Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe 的一端(Connection 对象)。我们对Pipe 的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

 

         (2) Queue 与Pipe 相类似,都是先进先出的结构。但Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。queue.py: 注:本程序只能在linux/Unix上运行:

#coding=utf-8
import multiprocessing
import time,os
#input worker
def inputQ(queue):
   info = str(os.getpid()) + '(put):' + str(time.time())
   queue.put(info)
 
#output worker
def outputQ(queue,lock):
   info = queue.get()
   lock.acquire()
   print (str(os.getpid()) + '(get):' + info)
   lock.release()
 
#Main
record1 = [] # store input processes
record2 = [] # store outputprocesses
lock = multiprocessing.Lock()#addlock ,avoid to san luan daying
queue = multiprocessing.Queue(3)
#input processes
for i in range(10):
   process = multiprocessing.Process(target=inputQ,args=(queue,))
   process.start()
   record1.append(process)
 
#output processes
for i in range(10):
   process = multiprocessing.Process(target=outputQ,args=(queue,lock))
   process.start()
   record2.append(process)
   
for p in record1:
   p.join()
   
queue.close()
for p in record2:
p.join()

10.4 应用于自动化测试

     10.4.1 应用于自动化测试项目

为实现多进程运行测试用例,我需要对文件结构进行调整:Thread\TestProject

/test_project/thread1/baidu_sta.py -----测试用例

    /thread1/__init__.py

          /thread2/youdao_sta.py -----测试用例

 /thread2/__init__.py

               /report/ ----测试报告目录

            /all_tests_pro.py

                   创建了thread1 和thread2 两个文件夹,分别放入了两个测试用例; 下面我们编写

all_tests_process.py 文件来通过多进程来执行测试用例。test_all_pro.py:

#coding=utf-8
import unittest, time, os,multiprocessing
import HTMLTestRunner
def EEEcreatsuit():
   casedir=[]
   listaa=os.listdir('\\Users\\ewang\\Desktop\\Python_Selenium2\\Thread\\TestProject')
   print listaa
   for xx in listaa:
        if "thread" in xx:
           casedir.append(xx)
   
   suite=[]
   for n in casedir:
        testunit=unittest.TestSuite()
        discover=unittest.defaultTestLoader.discover(n,pattern ='test*.py',top_level_dir=n)
        print discover
        for test_suite in discover:
            for test_case in test_suite:
                testunit.addTests(test_case)
        suite.append(testunit)
 
   print "===casedir:%r====" %casedir
   print "+++++++++++++++++++++++++++++++++++++++++++++++"
   print "!!!suite:%r!!!" %suite
   return suite,casedir
 
 
def EEEEEmultiRunCase(suite,casedir):
   now =time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time()))
   filename = '.\\report/'+now+'result.html'
   fp = file(filename, 'wb')
   proclist=[]
   s=0
   for i in suite:
        runner =HTMLTestRunner.HTMLTestRunner(stream=fp,title=u'测试报告',description=u'用例执行情况:' )
        proc =multiprocessing.Process(target=runner.run(i),args=(i,))
        proclist.append(proc)
        s=s+1
   for proc in proclist: proc.start()
   for proc in proclist: proc.join()
   fp.close()
 
if __name__ == "__main__":
   runtmp=EEEcreatsuit()
  EEEEEmultiRunCase(runtmp[0],runtmp[1])

定义casedir 数组,读取test_project 目录下的文件夹,找到文件夹的名包含“thread”的文件夹添加到casedir 数组中.

定位suite 数组,for 循环读取casedir数组中的数据(即thread1 和thread2 两个文件夹)。通过discover 分别读取文件夹下匹配test*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的suite 数组中。

在整个EEEcreatsuite1()函数中返回suite 和casedir 两个数组的值。

定义proclist()函数,for 循环把suite数组中的用例执行结果写入HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到proclist 数组中,for 循环proc.start()开始进程活动,proc.join()等待线程终止。

 10.4.2 应用于Grid2分布式测试

         Selenium Grid 只是提供多系统、多浏览器的执行环境,Selenium Grid 本身并不提供并行的执行策略。也就是说我们不可能简单地丢给SeleniumGrid 一个test case 它就能并行地在不同的平台及浏览器下运行。如果您希望利用Selenium Grid 分布式并行的执行测试脚本,那么需要结束Python 多线程技术。

启动Selenium Server

在本机打开两个命令提示符窗口分别:

启动一个hub (默认端口4444),ip 地址为:172.16.10.66

>java -jar selenium-server-standalone-2.39.0.jar -role hubtandalone-2.39.0.jar-role hub

启动一个本地node(节点)

>java -jar selenium-server-standalone-2.39.0.jar -role node -port 5555 -jarselenium-server-stan启动一个远程node(节点),ip 为:172.16.10.34

>java -jar selenium-server-standalone-2.39.0ar -role node -port 5555  -hubhttp://172.16.10.66:4444/grid/register

 fnngj@fnngj-VirtualBox:~/selenium$java -jar selenium-server-standalone-2.39.0ar -role node -po

grid_thread.py

</pre><pre name="code" class="python"><pre name="code" class="python">#coding=utf-8
from threading import Thread
from selenium import webdriver
import time
 
list ={'http://127.0.0.1:4444/wd/hub':'chrome',
           'http://127.0.0.1:5555/wd/hub':'internet explorer',
            'http://172.16.10.34:5555/wd/hub':'firefox'
            }
 
def test_baidu(host,browser):
        print 'start:%s' %time.ctime()
        print host,browser
        driver = webdriver.Remote( command_executor=host,
                                   desired_capabilities={'platform': 'ANY',
                                                         'browserName':browser,
                                                         'version': '',
                                                         'javascriptEnabled': True
                                  })
      driver.get('http://www.baidu.com')
      driver.find_element_by_id("kw").send_keys(browser)
      driver.find_element_by_id("su").click()
      sleep(2)
      driver.close()
 
threads = []
files = range(len(list))
for host,browser in list.items():
   t = Thread(target=test_baidu,args=(host,browser))
   threads.append(t)
if __name__ == '__main__':
   for i in files:
        threads[i].start()
   for i in files:
        threads[i].join()
print 'end:%s' %time.ctime()


</pre><pre>

到此,我们才真正解决了同一个用例不同的主机与浏览器下并行执行的目的。Grid主要实现在本机与远程主机启动多个节点的作用,要想实现并行的效果还需要借助Python的多线程技术。

© 著作权归作者所有

henni_719
粉丝 2
博文 466
码字总数 343938
作品 0
信阳
QA/测试工程师
私信 提问
python资料全集

python: 微信公众号开发小记——2.80端口上的服务 python: 微信公众号开发小记——3.接入三方登录 使用python编写一个壁纸网站的简单爬虫 python: python List 用法 Python 中各个时间复杂度...

d_watson
2016/04/15
185
0
ApacheCN 人工智能知识树 v1.0

Special Sponsors 贡献者:飞龙 版本:v1.0 最近总是有人问我,把 ApacheCN 这些资料看完一遍要用多长时间,如果你一本书一本书看的话,的确要用很长时间。但我觉得这是非常麻烦的,因为每本...

ApacheCN_飞龙
05/18
0
0
八月暑期福利,10本Python热门书籍免费送!

八月第一周,网易云社区联合博文视点为大家带来Python专场送书福利,10本关于Python的书籍内容涉及Python入门、绝技、开发、数据分析、深度学习、量化投资等。以下为书籍简介,送书福利请见文...

网易云
2018/08/02
0
0
筒子们,免费学习《Python自动化测试开发实践》课啦,请接住~

经常会有人问: 蛋哥,最近有没有什么免费的技术课啊? 蛋哥,最近有什么关于Python的课程么? 蛋哥,最近有没有测试的直播课额? 蛋哥,…… 应广大群众的强烈呼唤,为了让大家尽可能多的学...

蜗牛学院
2018/04/13
0
0
selenium webdriver (13) -- 结合pyunit生成测试报告

测试包含测试用例设计,测试执行,测试报告输出 测试用例设计一般是静态的,以文档的方式进行存储 测试执行,可以是手动的,也可以自动化用例执行 测试报告,可以是手动的,按照一定格式的测...

terry_hding
2016/08/04
288
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Boot + Mybatis-Plus 集成与使用(二)

前言: 本章节介绍MyBatis-Puls的CRUD使用。在开始之前,先简单讲解下上章节关于Spring Boot是如何自动配置MyBatis-Plus。 一、自动配置 当Spring Boot应用从主方法main()启动后,首先加载S...

伴学编程
昨天
7
0
用最通俗的方法讲spring [一] ──── AOP

@[TOC](用最通俗的方法讲spring [一] ──── AOP) 写这个系列的目的(可以跳过不看) 自己写这个系列的目的,是因为自己是个比较笨的人,我曾一度怀疑自己的智商不适合干编程这个行业.因为在我...

小贼贼子
昨天
7
0
Flutter系列之在 macOS 上安装和配置 Flutter 开发环境

本文为Flutter开发环境在macOS下安装全过程: 一、系统配置要求 想要安装并运行 Flutter,你的开发环境需要最低满足以下要求: 操作系统:macOS(64位) 磁盘空间:700 MB(不包含 IDE 或其余...

過愙
昨天
6
0
OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
昨天
2.7K
16
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
昨天
42
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部