git clone https://github.com/kevinspider/lotus.git
cd lotus
pip install .
使用 curl_to_lotus.raycast
直接将 curl 命令转为 lotus 代码;
context = {
'_m_h5_tk': 'd903c89c3ad363d9cdf68fc07d4c4565_1733980172025',
'_m_h5_tk_enc': '974ad2908d599c5dc1673ef1570a71b5',
'df': pd.DataFrame()
}
API.context = context
keyword = "口罩"
location = "宁波"
page = 1
api = API(keyword, location, page)
api.download()
API.context['df'].to_excel("result.xlsx")
def parse(self, res: Response):
if self.is_update(res):
return API(self.keyword, self.location, self.page).download()
for each in res.json()['data']['resultList']:
result = json_parse(each, ['exContent'])['exContent']
API.context['df'] = merge_data(result, API.context['df'])
logger.info(
f"page is {self.page} items:{len(res.json()['data']['resultList'])} totol:{len(API.context['df'])}")
if self.has_next(res):
return API(self.keyword, self.location, self.page + 1).download()
def is_update(self, res: Response):
if '["FAIL_SYS_TOKEN_EXOIRED::令牌过期"]' in res.text:
cookies = res.get_set_cookies("_m_h5_tk", "_m_h5_tk_enc")
API.context.update(cookies)
return True
return False
def has_next(self, res: Response):
next_page = jsonpath.jsonpath(res.json(), "$..hasNextPage")
if next_page:
next_page = next_page[0]
return next_page
return False
# 并发
manager = ThreadManager(max_workers=4)
# 数据存储
df = pd.DataFrame()
# 上下文
context = ThreadContext(
_m_h5_tk="d903c89c3ad363d9cdf68fc07d4c4565_1733980172025",
_m_h5_tk_enc="974ad2908d599c5dc1673ef1570a71b5",
)
Api.context = context
Api.manager = manager
cities = ["宁波"]
keywords = ["口罩"]
tasks = [Api(keyword=keyword, location=city, page=1) for city in cities for keyword in keywords]
for task in tasks:
manager.add_task("download", task)
results = manager.join_for_results()
for each in results:
df = merge_data(each, df)
df.to_excel("result.xlsx")
class Api(Spider):
manager: ThreadManager = None
...
manager = ThreadManager(max_workers=4)
Api.manager = manager
在业务逻辑中, 如果需要新增任务, 可以使用 ThreadManager
暴露出来的 add_task
方法
def parse(self, response: Response):
logger.info(f"Parsing response for {self.keyword}, {self.location}, page {self.page} {response.text}")
if self.is_ignore(response):
return None
if self.page == 1:
self.add_pages(response)
result = json_parse(response.json(), ["area", "price", "title", "itemId"], "$.data.resultList")
return result
def add_pages(self, response: Response):
sku_nums = jsonpath.jsonpath(response.json(), "$..docNumWhenFirstPage")[0]
total_page = (sku_nums // 30) + 1
for page in range(2, total_page + 1):
# print("增加任务", page)
Api.manager.add_task("download", Api(self.keyword, self.location, page))
class Api(Spider):
context: ThreadContext = None
...
def is_ignore(self, response: Response):
if '["FAIL_SYS_TOKEN_EXOIRED::令牌过期"]' in response.text:
cookies = response.get_set_cookies("_m_h5_tk", "_m_h5_tk_enc")
Api.context.update(**cookies)
Api.manager.add_task("download", Api(self.keyword, self.location, self.page))
logger.info("Token expired, updated cookies and re-added task")
return True
return False