1.最近工作中遇到一个问题,需要多线程去解决,Casandra数据库表中数据量太大的话使用count(*)统计数量会导致超时问题,可是有需求必须统计数据量,所以想了个解决方案,把一天的查询时间切分为一小时的数组,使用多线程统计每一小时的数据量,每个线程将结果写入到队列中,最后将队列中的值聚合运算就拿到了总的数据量,这其中遇到一个问题就是多线程无法直接拿到返回值,多线程写入到队列中的是一个多线程对象,解决方案就是写一个获取返回i值的类,每次多线程初始化一个取值类,线程的结果通过类方法取值之后保存到队列中。
class Value:def __init__(self):self.value = Nonedef get_value(self):return self.valuedef run(self,cql2,starttime,endtime,*args):res = CassandraDb().select_data(cql2,starttime,endtime,*args)try:for i in res:mes_num = i.countexcept:mes_num = 0self.value = mes_numreturn self.valuedef message_num(start_time,end_time,cql2,*args):time_list = []for i in range(start_time,end_time+1,3600000):time_list.append(i)time_list.append(end_time)result = []for ti in time_list:va = Value()if ti <= time_list[-1]:if ti +3600000>time_list[-1]:starttime = tiendtime = time_list[-1]t = Thread(target=va.run,args=(cql2,str(starttime),str(endtime),*args))t.start()t.join()result.append(va.get_value())breakelse:starttime = tiendtime = ti + 3600000t = Thread(target=va.run,args= (cql2,str(starttime),str(endtime),*args))t.start()t.join()result.append(va.get_value())return sum(result)
2.还有一种方法是使用ThreadPoolExecutor线程池,线程池中有一个result()方法可以直接拿到线程返回方法。线程池用起来还是简单方便的
"""desc:
"""import time
import random
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completeddef func1():rs = []for i in range(100):rs.append(uuid.uuid1())time.sleep(5)return rsdef func2():rs = []for i in range(100):rs.append(i)time.sleep(5)return rsdef func3(a):rs = []for i in range(100):rs.append(random.random())time.sleep(5)return rsdef main():rs1 = func1()rs2 = func2()rs3 = func3(2)print(rs1, rs2, rs3)def main1():func_list = [(func1, None), (func2, None), (func3, 3)]with ThreadPoolExecutor(max_workers=len(func_list)) as t:obj_list = []for func_tuple in func_list:func, args = func_tupleobj = t.submit(func, args)obj_list.append(obj)for future in as_completed(obj_list):data = future.result()print(data)if __name__ == "__main__":start = time.time()main1()print("耗时: ", time.time() - start)