8000 GitHub - kevinspider/lotus
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

kevinspider/lotus

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

lotus

git clone https://github.com/kevinspider/lotus.git
cd lotus
pip install .

curl 转 lotus

使用 curl_to_lotus.raycast 直接将 curl 命令转为 lotus 代码;

单并发

类变量存储全局变量

context = {
    '_m_h5_tk': 'd903c89c3ad363d9cdf68fc07d4c4565_1733980172025',
    '_m_h5_tk_enc': '974ad2908d599c5dc1673ef1570a71b5',
    'df': pd.DataFrame()
}

API.context = context
keyword = "口罩"
location = "宁波"
page = 1

api = API(keyword, location, page)
api.download()

API.context['df'].to_excel("result.xlsx")

解析和逻辑放在 parse 函数中

def parse(self, res: Response):
    if self.is_update(res):
        return API(self.keyword, self.location, self.page).download()

    for each in res.json()['data']['resultList']:
        result = json_parse(each, ['exContent'])['exContent']
        API.context['df'] = merge_data(result, API.context['df'])

    logger.info(
        f"page is {self.page} items:{len(res.json()['data']['resultList'])} totol:{len(API.context['df'])}")

    if self.has_next(res):
        return API(self.keyword, self.location, self.page + 1).download()

def is_update(self, res: Response):
    if '["FAIL_SYS_TOKEN_EXOIRED::令牌过期"]' in res.text:
        cookies = res.get_set_cookies("_m_h5_tk", "_m_h5_tk_enc")
        API.context.update(cookies)
        return True
    return False

def has_next(self, res: Response):
    next_page = jsonpath.jsonpath(res.json(), "$..hasNextPage")
    if next_page:
        next_page = next_page[0]
        return next_page
    return False	

线程池

范式

# 并发
manager = ThreadManager(max_workers=4)

# 数据存储
df = pd.DataFrame()

# 上下文
context = ThreadContext(
    _m_h5_tk="d903c89c3ad363d9cdf68fc07d4c4565_1733980172025",
    _m_h5_tk_enc="974ad2908d599c5dc1673ef1570a71b5",
)

Api.context = context
Api.manager = manager

cities = ["宁波"]
keywords = ["口罩"]
tasks = [Api(keyword=keyword, location=city, page=1) for city in cities for keyword in keywords]

for task in tasks:
    manager.add_task("download", task)

results = manager.join_for_results()
for each in results:
    df = merge_data(each, df)

df.to_excel("result.xlsx")

线程管理使用 ThreadManager

class Api(Spider):
    manager: ThreadManager = None
    ...

manager = ThreadManager(max_workers=4)
Api.manager = manager

在业务逻辑中, 如果需要新增任务, 可以使用 ThreadManager 暴露出来的 add_task 方法

def parse(self, response: Response):
    logger.info(f"Parsing response for {self.keyword}, {self.location}, page {self.page} {response.text}")
    if self.is_ignore(response):
        return None
    if self.page == 1:
        self.add_pages(response)
    result = json_parse(response.json(), ["area", "price", "title", "itemId"], "$.data.resultList")
    return result

def add_pages(self, response: Response):
    sku_nums = jsonpath.jsonpath(response.json(), "$..docNumWhenFirstPage")[0]
    total_page = (sku_nums // 30) + 1
    for page in range(2, total_page + 1):
        # print("增加任务", page)
        Api.manager.add_task("download", Api(self.keyword, self.location, page))

全局变量使用 ThreadContext

class Api(Spider):
    context: ThreadContext = None
    ...

    def is_ignore(self, response: Response):
        if '["FAIL_SYS_TOKEN_EXOIRED::令牌过期"]' in response.text:
            cookies = response.get_set_cookies("_m_h5_tk", "_m_h5_tk_enc")
            Api.context.update(**cookies)
            Api.manager.add_task("download", Api(self.keyword, self.location, self.page))
            logger.info("Token expired, updated cookies and re-added task")
            return True
        return False

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published
0