Flyte & StreamFlow
October 2024
Goal: Astrophysical simulations for exascale computing
Challenge: Analyze large-size simulation data (~ Petabytes)
Solution: Efficient ML-based workflows using dimensionality reduction
Input
Simulation Data (~ Petabytes)
Time snapshots storing particle positions, velocities, and other properties, e.g. from OpenGADGET (IllustrisTNG)
Workflow
Output
Spherinator & HiPSter
Note
BioExcel Building Blocks is a great example using CWL for biomolecular simulation.
workflow.cwl
class: Workflow
inputs:
epochs: int
train_dataset: Directory
eval_dataset: Directory
train_script: File
eval_script: File
outputs:
model:
type: File
outputSource: train/model
accuracy:
type: string
outputSource: eval/accuracy
steps:
train:
run: train.cwl
in:
train_script:train_script
epochs:epochs
dataset:train_dataset
out:
- model
- loss
evaluate:
run: eval.cwl
in:
eval_script:eval_script
dataset:eval_dataset
model:train/model
out:
- accuracy
inputs
outputs
steps
workflow.cwl
class: Workflow
inputs:
epochs: int
train_dataset: Directory
eval_dataset: Directory
train_script: File
eval_script: File
outputs:
model:
type: File
outputSource: train/model
accuracy:
type: string
outputSource: eval/accuracy
steps:
train:
run: train.cwl
in:
train_script:train_script
epochs:epochs
dataset:train_dataset
out:
- model
- loss
evaluate:
run: eval.cwl
in:
eval_script:eval_script
dataset:eval_dataset
model:train/model
out:
- accuracy
train.cwl
class: CommandLineTool
baseCommand: [python]
inputs:
train_script:
type: File
inputBinding:
position: 1
dataset:
type: Directory
inputBinding:
position: 2
prefix: --dataset
epochs:
type: int
inputBinding:
position: 2
prefix: --epochs
outputs:
model:
type: File
outputBinding:
glob: state_dict_model.pt
loss:
type: string
outputBinding:
glob:
Thanks to Martin Wendt (ITS) for providing a comprehensive testing setup including Kubernetes (k3s), Flyte, and MinIO.
development
, staging
, and production
)@task
Workflows are used to structure the task execution graph
@task
def generate_processed_corpus() -> List[List[str]]:
@task
def train_word2vec_model(training_data: List[List[str]], hyperparams: Word2VecModelHyperparams) -> model_file:
@task
def train_lda_model(corpus: List[List[str]], hyperparams: LDAModelHyperparams) -> Dict[int, List[str]]:
@task
def word_similarities(model_ser: FlyteFile[MODELSER_NLP], word: str) -> Dict[str, float]:
@task
def word_movers_distance(model_ser: FlyteFile[MODELSER_NLP]) -> float:
@workflow
def nlp_workflow(target_word: str = "computer") -> [Dict[str, float], float, Dict[int, List[str]]]:
corpus = generate_processed_corpus()
model_wv = train_word2vec_model(training_data=corpus, hyperparams=Word2VecModelHyperparams())
lda_topics = train_lda_model(corpus=corpus, hyperparams=LDAModelHyperparams())
similar_words = word_similarities(model_ser=model_wv.model, word=target_word)
distance = word_movers_distance(model_ser=model_wv.model)
return similar_words, distance, lda_topics
A Flyte task operates within its own container and runs on a Kubernetes pod
StreamFlow
Pros
Cons
Flyte
Pros
Cons
The background image was generated with FLUX on fal.ai
using the prompt ‘Visualize two worlds showing the difference between HPC and AI infrastructure’
HPC workload
Simulations, modeling, and data analysis
Large clusters of CPUs and high-speed interconnects
Parallel computing (MPI)
Scheduling with Slurm
Bare-metal compilation
Distributed file storage
AI/ML workload
Training and inference of models
Data-intensive tasks
Numerous Matrix operations requires accelerators (GPU, TPU, ..)
Cloud-native deployment
Kubernetes and containers
Object storage
Hardware and software requirements differ significantly based on the nature of their workloads and infrastructure.
File Storage
~ 1-4 GB/s (READ)
Block Storage
~ 5-10 GB/s (READ)
Object Storage
Unstructured data (eg, images, videos, logs)
RESTful API (cloud-native)
S3 (AWS), MinIO
Objects can’t be modified
~ 50 GB/s (GET, MinIO benchmark, 8 storage nodes)
Can be scaled up to TB/s
ML Workflow Orchestration (Bernd Doser, HITS)