As agent technology continues to evolve, enterprises and users are increasingly investing in developing and deploying agents to improve business efficiency.
However, constrained by the timeliness of training datasets, large language models often struggle to obtain the latest data in a timely manner, making it difficult to fully meet the actual needs of agents.
To address this, agents often leverage RAG (Retrieval-Augmented Generation) technology to store and retrieve “new data.”
This “new data” includes not only internal enterprise information, such as IT processes and contracts, but also real-time information from the internet, such as stock prices and company announcements.
For internet data, web crawlers are needed to fetch and store it in vector databases, facilitating subsequent retrieval and utilization by agents.
With Flame v0.5’s new Python API, we can easily build a high-concurrency web crawler to efficiently obtain the required internet information. Additionally, we can use Flame’s command-line tools to monitor the execution progress and results of crawling tasks in real-time.
Crawler Server Code
In Flame v0.5, a new FlameInstance interface has been added. Using this interface and its Python decorator (FlameInstance.entrypoint), we can explicitly specify the application’s entry function. When a client initiates a request, Flame automatically schedules the request (task) to an appropriate application instance based on the cluster’s current resource utilization, achieving efficient resource allocation and task processing.
ins = flamepy.FlameInstance()
@ins.entrypoint
def crawler(wp: WebPage) -> Answer:
text = requests.get(wp.url, headers=headers).text
md = markitdown.MarkItDown()
stream = io.BytesIO(text.encode("utf-8"))
result = md.convert(stream).text_content
...
return Answer(answer=f"Crawled {wp.url}")
By creating an instance ins with flamepy.FlameInstance, we can use it to define the application’s entry function (def crawler(wp: WebPage) -> Answer). This entry function, upon receiving a web page request, downloads the content of the specified URL and converts it into a Markdown-formatted document. The converted Markdown content can then be stored in a shared directory or vector database, facilitating subsequent retrieval by agents.
Crawler Client Code
In Flame v0.5, the client’s Python interface remains unchanged. The client creates a session with the application using flamepy.create_session, then submits requests (tasks) to the application through the session.
Since flamepy uses asyncio, the client can asynchronously submit multiple requests (tasks) and wait for all tasks to complete using asyncio.gather.
crawler = await flamepy.create_session("crawler-app")
tasks = []
for url in urls:
tasks.append(crawler.invoke(WebPage(url=url)))
await asyncio.gather(*tasks)
await crawler.close()
Execution Results
Starting the Flame Cluster
For demonstration purposes, Flame’s codebase includes a compose.yaml configuration file that allows users to start a Flame cluster locally, as shown below:
k82cn$ docker compose up -d
[+] Running 4/4
✔ Network flame_default Created 0.0s
✔ Container flame-flame-executor-manager-1 Started 0.3s
✔ Container flame-flame-console-1 Started 0.4s
✔ Container flame-flame-session-manager-1 Started 0.3s
Once the Flame cluster is running, you can log into the flame-console container to check the cluster status:
k82cn$ docker compose exec -it flame-console /bin/bash
root@2818097c3900:/# flmctl list -a
Name State Tags Created Shim Command
flmexec Enabled 13:37:48 Host /usr/local/flame/bin/flmexec-service
flmping Enabled 13:37:48 Host /usr/local/flame/bin/flmping-service
root@2818097c3900:/#
Deploying the Crawler Application
Once the application is developed, you can deploy it to the Flame cluster using a YAML configuration file.
The configuration must specify the application name, working directory, and command line. Clients create sessions using the application name, while Flame starts the application on resource nodes using the command line and other configuration parameters.
root# cat crawler-app.yaml
metadata:
name: crawler-app
spec:
working_directory: /opt/examples/crawler/
environments:
FLAME_LOG_LEVEL: DEBUG
command: /usr/bin/uv
arguments:
- run
- crawler.py
- api.py
Once the application configuration is created, you can deploy the application to the Flame cluster using flmctl.
root# flmctl register -f ./crawler-app.yaml
root# flmctl list -a
Name State Tags Created Shim Command
flmexec Enabled 13:37:48 Host /usr/local/flame/bin/flmexec-service
flmping Enabled 13:37:48 Host /usr/local/flame/bin/flmping-service
crawler-app Enabled 13:51:53 Host /usr/bin/uv
Running
Once the crawler application is deployed, clients can send requests (tasks).
root# uv run client.py api.py
......
Once all crawling tasks are completed, you can use the flmctl tool to view the detailed status of each session and task. In this example, the Markdown documents have been saved to the local file system.
root# flmctl list -s
ID State App Slots Pending Running Succeed Failed Created
1 Closed crawler-app 1 0 0 9 0 13:57:03
root# flmctl view -s 1 -t 1
Task: 1
Session: 1
Application: crawler-app
State: Succeed
Events:
13:57:08.549: Running task on host <78f3468f8bc8>. (1)
13:57:09.187: Task completed successfully on host <78f3468f8bc8>. (2)
root# flmctl view -s 1 -t 2
Task: 2
Session: 1
Application: crawler-app
State: Succeed
Events:
13:57:08.676: Running task on host <78f3468f8bc8>. (1)
13:57:09.735: Task completed successfully on host <78f3468f8bc8>. (2)
Summary
Flame v0.5’s Python API greatly simplifies the development process and enables efficient integration with third-party libraries. By leveraging Flame’s excellent scheduling and high-concurrency capabilities, users can easily build applications capable of handling large-scale requests.
References
comments powered by