Saya mencoba merasakan file dengan DAG aliran udara, tetapi filesensor saya selalu terjebak pada status antrian. Saya telah mencoba dengan sampel kode di bawah ini. Apakah ada yang saya lewatkan? BTW, versi aliran udara saya adalah 2.0.1. from airflow.contrib.sensors.file_sensor import FileSensor fr....
Saat ini saya menggunakan Airflow Task Flow API 2.0. Saya memiliki masalah menggabungkan penggunaan tugas taskgroup dan branchpythonoperator. Di bawah ini adalah kode saya: import airflow from airflow.models import DAG from airflow.decorators import task, dag from airflow.operators.dummy_operator im....
27 Mei 2021, 13:38
Saat ini saya sedang menyiapkan pekerja jarak jauh melalui Docker Swarm for Apache AirFlow pada Compon AWS EC2. Seorang pekerja terpencil dimatikan setiap 60 detik tanpa alasan yang jelas dengan kesalahan berikut: BACKEND=postgresql+psycopg2 DB_HOST=postgres DB_PORT=5432 BACKEND=postgresql+psycopg2....
Saya telah mengatur DAG dengan parameter berikut local_tz = pendulum.timezone('US/Eastern') default_args = { 'retries': 3, 'retry_delay': timedelta(minutes=5) } dag = DAG( dag_id='some_dag', start_date=datetime(2021, 1, 8, tzinfo=local_tz), schedule_interval='0 16 8 * *', ....
26 Mei 2021, 04:40
Kode: from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.providers.google.cloud.hooks.gcs import GCSHook class GCSUploadOperator(BaseOperator): @apply_defaults def __init__( self, bucket_name, target_file_name, ....
25 Mei 2021, 12:59
Hai saya punya dag sederhana yang menggunakan BashOperator dan DockerOperator. Saya ingin memicu dari skrip python seperti: import requests import json from datetime import datetime from pprint import pprint headers = { 'accept':'application/json', 'content-type':'application/json', } auth....
23 Mei 2021, 10:29
Saya telah menginstal aliran udara di server yang menjalankan Ubuntu dan Python 3.8. Saya mencoba mengimpor DAG sederhana di UI aliran udara untuk membuat daftar file di ember. from airflow import DAG from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator from airflow....
23 Mei 2021, 08:53
Saya telah berhasil menginstalnya di CentOS7 di VMware sebelumnya. Namun, dengan cara yang sama, ada masalah menginstal secara manual dari CentOS7 di Docker. (Build official CentOS.) (venv) [jykim@0f0090962efa dev]$ cat /etc/*release* CentOS Linux release 7.9.2009 (Core) Ketika aliran udara dipasan....
21 Mei 2021, 09:26
Saya menulis DAG dengan subdag di dalamnya: from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.subdag import SubDagOperator from pendulum import datetime def subdag(parent_dag: DAG): with DAG("{}.SubDag".format(parent_dag.dag_id), defaul....
20 Mei 2021, 18:15
Saya melihat kesalahan di bawah ini terjadi: Traceback (panggilan terakhir terakhir): File "/usr/local/lib/python3.8/site-packages/irflow/models/taskinstance.py", line 1138, di _run_raw_task self._prepare_and_execute_callbacks (konteks, tugas) "/ usr / local / lib / python3.8 / situs-paket / aliran ....
20 Mei 2021, 16:32
Katakanlah saya memiliki dua tugas yang menggunakan dua versi, katakanlah, panda #my_task_one import pandas as pd #Pandas 1.0.0 def f1(data): . . return 0 Dan #my_task_two import pandas as pd #version 2.0.0 def f2(data): . . return 0 Dalam aliran udara saya (lokal, tidak....
20 Mei 2021, 10:31
Apakah ada cara saya dalam aliran udara (mungkin file konfigurasi) dapat menambahkan /path/to/folder/ sebagai folder secara global? Saat ini saya harus menulis import sys sys.path += ["/path/to/folder/"] Di bagian atas semua danggs saya, yang jelas tidak terlalu baik, kalau-kalau saya mengubah fold....
20 Mei 2021, 09:50
Baru dengan aliran udara di sini, dan bertanya-tanya apakah ada ikon legenda di Misalnya, dalam contoh example_complex dari Mulai tutorial: ....
19 Mei 2021, 23:58
Sebagai pemula ke aliran udara, saya sedang melihat example_branch_operator: """Example DAG demonstrating the usage of the BranchPythonOperator.""" import random from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator from....
19 Mei 2021, 23:44
Kami pindah ke AirFlow 2.0 dan saya perhatikan kesalahan di bawah ini, tampaknya Snowflakehook tidak dapat membaca kueri yang terletak di direktori 'SQL' kami, ini berjalan dengan baik di udara 1.x: Snowflake.connector.Errors.ProgrammingError: 001003 (42000): 019C5AC7-0602-31B5-0000-01B526E4FA46: SQ....
Saya mencoba mengimplementasikan SLA dalam DAG aliran udara saya. Saya tahu bagaimana cara kerja SL, Anda menetapkan objek Timedelta dan jika tugasnya tidak dilakukan dalam durasi itu, itu akan mengirim email dan memberi tahu bahwa tugas belum dilakukan. Saya ingin beberapa fungsi yang sama, tetapi ....
Kode: import datetime import logging from airflow import DAG from airflow.operators.python_operator import PythonOperator def hello_world(ti, execution_date, **context): logging.info("Hello World") return "Gorgeous" def addition(ti, **context): # Want belows are same each other l....
18 Mei 2021, 02:49
Saya memiliki cluster aliran udara yang terbuat dari 3 node pekerja dengan selysexecutor dan rabbitmq untuk komunikasi. Danggs saya biasanya terdiri dari tugas-tugas yang mengunduh file, unzip mereka, mengunggahnya ke Hadoop, dll. Jadi mereka bergantung satu sama lain dan harus berjalan pada mesin t....
Saya seorang pemula sejauh menyangkut aliran udara dan pelabuhan; Untuk membuat segalanya lebih rumit, saya menggunakan astronom, dan untuk memperburuk keadaan, saya menjalankan aliran udara pada Windows. (Tidak pada subsistem Unix - tidak dapat menginstal Docker di Ubuntu 20.4). "Astro dev memulai"....
17 Mei 2021, 22:50
Saya dulu membuat tugas dengan operator Python dan mengambil eksekusi dalam aliran udara 1 sebagai berikut def task(**kwargs): date = kwargs['execution_date'] Apa cara yang benar untuk melakukannya dengan API Task Flow baru? (mungkin melewatkannya) Terima kasih....
14 Mei 2021, 20:36
Saya mengikuti langkah-langkah dari menjalankan aliran udara Ubuntu 20.04 (TypeError: Bidang wajib "Type_ignores" hilang dari modul````, dan menerima yang berikut: (airflow-uGvev7QO) root@testing2:/opt/airflow# airflow db init DB: sqlite:////root/airflow/airflow.db [2021-03-30 21:17:43,978] {db.py:6....
31 Maret 2021, 04:21