Ethan's Values
[MLOps Engineer, Airflow] Argument 사용하여 Dag 실행하기 본문
airflow를 사용하여 데이터 삽입, 전처리, 모델링, 검증 까지의 일련의 과정을 코딩해둔 파일을 통해 스케쥴링만 하는 환경이 단순하고 정적일 수 있다. 모델링 시 학습 변수(input), Target 변수의 타입, 스케쥴링 설정 등 적절한 옵션을 통해 작업을 수행해야 할 때 Airflow의 기능 중 Argument를 사용해서 DAG를 실행할 수 있다.
실제 사용 화면
DAG 파일 작성 시 아래 모듈을 통해 Airflow 웹 상에서의 DAG 실행 시 아래 Action을 클릭
아래와 같은 화면이 나타난다. 아래 화면은 DAG 파일 코딩 시 Custom하게 변경할 수 있다.
아래 화면은 학습할 입력변수 input 과 Target 변수를 각 DB 테이블에서 불러와 key값으로 병합하여 학습된다.
해석
- input: iris_data 테이블에서 id, petal_length, petal_width 변수 사용
- target: iris_data 테이블에서 id, targer 변수 사용
- key 값은 id
- scaler는 StandardScaler를 사용해 정규화
이후, Trigger를 클릭하면 입력한 내용을 자료형 대로 airflow에서 Param에서 받게 된다.
input과 target은 Dic 형태로 작성했기 때문에 Param에서 Dic 형태로 가지고 있게 된다.
이 입력 내용을 함수의 형태로 불러와 사용할 수 있다. 코드로 더 자세하게 설명
설계 코드
airflow의 모듈인 Param, ParamDict를 불러오기.
from airflow.models.param import Param, ParamsDict
화면정의 코드
위 화면을 정의하는 코드 부분입니다.
#DAG 정보 입력
with DAG(
dag_id = "ModelDevelop_custom",
default_args = default_args,
schedule=None,
params={
"input":
Param(
{},
type = "object"
),
"target":
Param(
{},
type = "object"
),
"key":
Param(
{},
type = "string"
),
"scaler":
Param(
{},
type = "string"
),
},
) as dag:
코드 해석
DAG를 정의하는 부분은 동일함.
params 변수를 정의하여 input, target 변수는 object형(Json, Dic 형태임)으로 정의하고, key, scaler는 String형으로 정의.
Param 사용 코드
airflow 사용 시 Trigger 되어 변수들의 입력값을 받은 후 사용하는 방법입니다. context[params]를 확인해보면 다음과 같은 Json 형태가 출력값으로 나오는 것을 확인할 수 있다.
#파라미터 확인
print(context["params"])
->
{'input': {'iris_data': ['id', 'petal_length', 'petal_width']}, 'target': {'iris_data': ['id', 'target']}, 'key': 'id', 'scaler': 'StandardScaler'}
params 변수를 context["params"]로 정의하여 딕셔너리로 사용함
여기서 ParamsDict는 airflow의 airflow.models.param.ParamsDict 타입임. 여기서는 타입힌팅으로 사용되어, 아무런 역할을 하지 않고 params 변수의 타입을 명시해둔것 임.
#params 변수 정의
#ParamsDict은 airflow의 airflow.models.param.ParamsDict 타입으로 타입힌팅으로써, 아무런 역할을 하지 않고 params 변수의 타입을 명시해둔것 임.
params: ParamsDict = context["params"]
#input 파라미터 확인
input_dict = params.get("input")
print(input_dict)
->
{'iris_data': ['id', 'petal_length', 'petal_width']}
'MLOps' 카테고리의 다른 글
MLOps Engineering(8) - Fast API를 사용하여 모델의 예측값 반환하는 REST API 구현 (0) | 2024.03.27 |
---|---|
MLOps Engineering(7) - 모델링 수행을 위한 스케쥴링 DAG (2) | 2024.03.26 |
MLOps Engineering(6) - 데이터 삽입 DAG 생성 (0) | 2024.03.25 |
MLOps Engineering(5) - docker-compose up 시 테이블 자동 생성 (0) | 2024.03.22 |
MLOps Engineering(4) - Airflow compose-yaml 파일로 개발환경구성및 Airflow 스케쥴링 DAG 생성 개요 (0) | 2024.03.22 |