Skip to content

Instantly share code, notes, and snippets.

@IvanildoBarauna
Created June 6, 2024 21:26
Show Gist options
  • Save IvanildoBarauna/05be14ff93cb020978604873915ca2cb to your computer and use it in GitHub Desktop.
Save IvanildoBarauna/05be14ff93cb020978604873915ca2cb to your computer and use it in GitHub Desktop.
from enum import Enum
from datetime import datetime, timezone
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging
logging.getLogger("__name__")
def logger(function):
def inner(*args, **kwargs):
print(f"{datetime.now(timezone.utc)} ::: Antes de chamar a função: {function.__name__}")
result = function(*args, **kwargs)
print(f"{datetime.now(timezone.utc)} ::: Depois de chamar a função: {function.__name__}")
return result
return inner
class pipelineType(Enum):
EXTRACT = 1
TRANSFORM = 2
LOAD = 3
class pipeline:
def __init__(self, type: pipelineType) -> None:
pass
@logger
def BeamObj(self):
return beam
@logger
def PipelineDirectRunner(self):
opts = PipelineOptions([f"--runner", "Direct", f"--direct_num_workers={1}"])
return beam.Pipeline(options=opts) # type: ignore
NewPipeline = pipeline(type=pipelineType.EXTRACT)
beamObj = NewPipeline.BeamObj()
beamOpts = NewPipeline.PipelineDirectRunner()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment