Study/Java

Spring Cloud DataFlow 사용해 보기

Bluesky_ 2023. 7. 3. 17:57
반응형

Spring Cloud DataFlow 소개

https://dataflow.spring.io/

Spring Cloud DataFlow 는 data-processing use case에 중점을 두고 application 개발과 배포를 간소화한다.

이를 위해 여러 Spring의 여러 라이브러리가 조합되어 기능을 제공한다.

간략하게 소개하면 Spring Cloud DataFlowSpring IntegrationSpring Batch 로 작성된 Spring Application을 등록하고 Cloud Platform 배포를 관리하는 도구이다.

Spring Cloud Dataflow는 두 가지 방식의 data-processing을 처리한다.

Streams - CDC (Change Data Capture)의 실시간 또는 준 실시간 데이터 처리 작업

Spring Integration 을 생각하면 되며 이를 사용한 Spring Cloud Stream application의 등록 및 배포를 관리한다.

데이터를 가져오고 처리하고 다시 내보내는 과정이 각각 독립적인 application이며 각 단계 사이에 rabbitmq , kafka 같은 message brocker를 사용하여 데이터를 전달하게 된다.

각 application은 계속 떠 있으며 지속적으로 데이터를 가져와서 가공하고 내보낸다.

각 단계는 Source, Processor, Sink 로 부르며 Spring Cloud DataFlow 에서는 각각 개별 application으로 등록하고 이를 연결하여 사용한다.

Task/Jobs - ETL (Extract, Transform, Load)의 일괄 처리 배치 (task) 작업

Spring Batch 를 생각하면 되며 이를 사용한 Spring Cloud Task application의 등록 및 배포를 관리한다.

batch application이 실행되면 한 번에 대량의 데이터를 가져와 처리하고 내보낸 후 application이 종료되는 형태로 주기적으로 해당 application을 실행하는 형식이다.

batch는 ItemReader , ItemProcessor , ItemWriter 로 구성되며 Spring Cloud DataFlow 에서는 Task application으로 등록하여 사용한다.

사용 방법에 대한 간단한 소개

Spring Cloud DataFlow의 설치가 복잡하므로 설치를 소개하기 전에 간단하게 Spring Cloud DataFlow가 어떤 것인지 간략하게 소개해본다.

설치가 된 Spring Cloud DataFlow의 Dashboard를 살펴보면 다음과 같다.

4개의 메뉴가 있는데 간략하게 설명하면 다음과 같다.

  • Application : 사용할 Spring Applicaiton 등록 (Stream, Task application을 등록)
  • Stream : stream 생성 및 배포 관리
  • Tasks / Jobs : Task 생성 및 배포 관리
  • Manage : 관리 메뉴

Application 등록

data processing 방식을 사용하기 위해 아래와 같은 Type의 Application을 등록하게 된다.

  • Streams
    • Source
    • Processor
    • Sink
  • Tasks / Jobs
    • Task

Applicaton 메뉴에서 Add Application을 선택하고 상황에 맞는 Application을 등록한다.

빌드된 Docker Image 또는 Maven jar 를 등록하여 사용할 수 있다.
(Cloud Platform 배포를 위해 사용하므로 Docker Image를 사용하면 좋을 듯싶다.)

등록 시 Source , Processor , Sink , Task 같은 Type을 지정하여 등록하게 된다.

Application이 등록되면 등록된 항목을 확인할 수 있다.

이렇게 등록된 Application은 Streams , Task / Jobs 메뉴에서 생성 시 지원하는 Type에 대한 Application을 선택할 수 있게 노출이 된다.

Stream , Tasks / Jobs 두 가지 방식의 Spring Application 이외의 Application도 App type으로 등록하여 사용할 수 있다.

Stream 생성 및 배포해 보기

예를 들어 Stream 메뉴에서 create stream을 선택하면

다음과 같이 Stream 생성 관련 항목을 선택할 수 있게 표시된다.

각 단계에 맞는 application을 선택하고 우측으로 드래그하여 배치하고 각 application 사이를 연결하면 된다.

SourceSink 는 필수이고 중간 과정인 Processor 는 사용하지 않을 수도 있다.

우측 화면에서 사용하고자 하는 Source , Processor , Sink 를 배치한 후 각 단계 사이를 연결해 주고 하단의 Create Stream 을 선택하면 생성할 Stream의 이름을 지정하는 레이어가 뜬다.

생성을 하면 목록에 생성한 Stream이 표시된다.

해당 Stream을 선택하면 해당 Stream에 대한 정보와 배포 버튼을 확인할 수 있다.

상단의 Deploy Stream 버튼을 누르면 배포할 환경과 배포 시 설정할 값들을 추가할 수 있는 항목이 뜬다.

배포를 하면 설정한 cloud platform에 배포가 된다.

bluesky-study-scdf-bluesky-study-scdf-source-http-v22-86d8w6wgw   1/1     Running            0                39s
bluesky-study-scdf-bluesky-study-scdf-sink-log-v22-568597chzdg9   1/1     Running            0                39s

Stream의 경우 앞서 말한 것처럼 Source, Processor , Sink 가 각각 개별 application으로 구성되기 때문에 pod도 각각 구성된다.

하지만 Spring Cloud DataFlow 에서는 설정된 하나의 Stream에서 Source , Processor , Sink 에 대한 배포를 묶어서 관리한다.

배포를 하고 나면 Stream 정보를 다시 확인하면 다음과 같이 변경된 것을 확인할 수 있다.

배포 설정을 변경하거나 배포된 pod의 수를 변경하거나 배포를 다시 취소할 수 있으며 각각의 pod의 로그를 확인할 수 있다.

그보다 더 아래쪽에는 배포 히스토리를 확인할 수 있고 이전 버전의 배포로 롤백을 할 수도 있다.

또한 Stream의 Runtime 항목에서 현재 배포된 Stream이 무엇이 있는지 확인할 수 있다.

Stream 구성 추가 설명

하나의 Stream을 등록하고 배포하는 것까지 살펴보았는데 만약 Stream을 생성할 때 Source , Processor , Sink 중 동일 타입을 분기하여 2개 이상 설정하면 분기된 만큼 별도의 Stream이 생성된다.

분기된 Stream에 대해서는 Create Stream 시 이름을 각각 지정하는 입력창이 뜬다.

예를 들어 아래처럼 Stream을 구성하면

Create Stream 시 2개의 Stream의 이름을 지정하라는 레이어가 뜬다.

Source , Processor , Sink 하나씩이 하나의 Stream 묶음이며 분기가 있으면 Spring Cloud DataFlow는 별도의 Stream을 생성하여 등록한다.

등록 Application의 Spring Cloud DataFlow 설정

application을 등록하고 Stream을 생성하여 배포하는 것까지 Spring Cloud DataFlow가 처리하는 것을 훑어보았다.

등록할 대상 Application을 어떻게 만들어야 하는지에 대한 설명은 하지 않았었다.

Spring Cloud DataFlow를 사용하지 않고 Spring Integration만 사용한다면 Source , Processor , Sink 각각 application의 기능을 구성하고 나면 그 사이에 데이터를 주고받기 위한 Message Broker 구성을 설정해야 한다.

Spring Cloud DataFlow를 사용하면 Message Broker에 대한 설정이 간소화된다.

Source의 경우를 예를 들면 다음처럼 Supplier bean을 등록하고

@Slf4j
@Configuration
public class SourceConfig {

    @Bean
    Supplier<String> message() {
        log.debug("source called");
        return () -> "dfd";
    }
}

해당 이름의 output 설정을 properties에 하면 된다.

# message is supplier function name
 spring.cloud.stream.function.bindings.message-out-0=output

message 부분은 등록된 Supplier bean의 이름이고 out 을 한다는 의미의 중간값, index 를 의미하는 마지막 값으로 구성된 이름으로 Source 에는 output에 대한 설정이 필요하고 Processor 에는 input과 output, Sink 에는 input에 대한 설정이 필요하며 각각 요청을 주고받을 수 있도록 값을 맞춰주어야 한다.

SourceSupplier bean을, ProcessorFunction bean을, SinkConsumer bean을 등록하고 각각 properties의 설정을 하면 된다.

이렇게 구성된 message 설정에 대해 message broker에 알아서 생성해 주고 데이터를 주고받게 처리를 해준다.

Prometheus 및 Grafana Metric 지원

Spring Cloud DataFlow를 통해 배포한 app들에 대한 Prometheus metric 수집을 지원하여 이를 Grafana 를 통해 확인할 수 있다.

Spring Cloud DataFlow 설치

Spring Cloud DataFlow를 구성하기 위해 사용하는 것들은 다음과 같다.

  • message broker : kafka 또는 rabbitmq
  • database : mariadb 또는 postgresql
  • Spring Cloud Skipper (with Spring Cloud Deployer)
  • Spring Cloud DataFlow Server
  • (optional) prometheus, grafana

따라서 Spring Cloud DataFlow를 사용하기 위해서는 사전에 message broker와 database가 설치가 되어 있어야 하고 Spring Cloud Skipper와 Spring Cloud DataFlow Server를 설치해야 한다.

설치에 관해서는 Spring Cloud DataFlow 문서를 참고하면 된다.

https://dataflow.spring.io/docs/installation/

https://dataflow.spring.io/docs/installation/kubernetes/kubectl/

다만 현재 기준의 문서의 가이드 대로 해보니 설치가 진행되지 않았다.

github에 kubernetes의 설치 관련 yaml을 제공해 주는데 main branch에는 관련 파일이 없기 때문에 2.10.x branch를 참고하였다.

https://github.com/spring-cloud/spring-cloud-dataflow/blob/2.10.x/src/local/k8s/yaml

설치를 진행하면서 겪은 문제들은 다음과 같다.

DB 버전 호환성 체크 필요

우선 database의 경우 Spring Cloud Skipper와 Spring Cloud DataFlow Server에서 사용한다.

Spring Cloud Skipper나 Spring Cloud DataFlow Server가 내부적으로 Flyway 를 사용하는데 지원하는 MariaDB 버전과 맞지 않으면 사용할 수 없었다.

https://flywaydb.org/documentation/database/mariadb

문서상으론 10.2부터 10.6까지 지원한다고 되어있다.

Cloud Platform 호환성 체크 필요

Spring Cloud Skipper가 Spring Cloud Deployer를 통해 배포를 하기 때문에 지원하는 버전이 맞아야 한다.

https://github.com/spring-cloud/spring-cloud-deployer-kubernetes

기존 등록된 secret을 serviceaccount에 imagePullSecrets으로 추가해야 함

https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/

kubernetes 배포 관련하여 권한 처리를 위해 imagePullSecrets 설정을 해야 한다.

예를 들어 다음과 같은 Secret을 등록하여 private registry에서 pull image를 사용하고 있었다면

kubectl create secret docker-registry regcred --docker-server=<your-registry-server> --docker-username=<your-name> --docker-password=<your-pword> --docker-email=<your-email>

해당 secret을 skipper에 설정한 serviceaccount에 사용하도록 추가해야 한다.

아래 명령으로 에디터를 열고

kubectl edit serviceaccount/default

관련 설정을 추가해야 한다.

imagePullSecrets:
  - name: regcred

설치 관련 kubernetes yaml 파일

kubernetes 설치 관련 yaml 들은 다음과 같다.

pv, pvc나 secret, ingress 설정은 생략하였다.

4가지 항목의 설치에 대한 참고 정도로 보면 될 것 같다.

rabbitmq

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq-deployment
  labels:
    app: rabbitmq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3-management
        ports:
        - containerPort: 5672
        - containerPort: 15672
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
      volumes:
      - name: rabbitmq-data
        persistentVolumeClaim:
          claimName: rabbitmq-data-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
spec:
  selector:
    app: rabbitmq
  ports:
  - protocol: TCP
    name: rabbitmq-port
    port: 5672
    targetPort: 5672
  - protocol: TCP
    name: rabbitmqmanagement-port
    port: 15672
    targetPort: 15672

postgres

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres-deployment
  labels:
    app: postgres
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:14.8
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              key: postgres-user
              name: postgres-secret
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              key: postgres-password
              name: postgres-secret
        volumeMounts:
        - name: postgres-volume
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-volume
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-service
spec:
  selector:
    app: postgres
  ports:
  - protocol: TCP
    port: 5432
    targetPort: 5432
  type: LoadBalancer

Spring Cloud Skipper

apiVersion: v1
kind: ConfigMap
metadata:
  name: skipper-configmap
  labels:
    app: skipper
data:
  application-kubernetes.yaml: |-
    logging:
      level:
        root: info
        org.springframework.cloud: debug
        io.fabric8: debug
    spring:
      output:
        ansi:
          enabled: NEVER
      cloud:
        deployer:
          kubernetes:
            imagePullPolicy: Always
            imagePullSecret:
              -name: regcred
        skipper:
          server:
            platform:
              kubernetes:
                accounts:
                  default:
                    imagePullPolicy: Always
                    imagePullSecret:
                      -name: regcred
                    environmentVariables: 'LANG=en_US.utf8,LC_ALL=en_US.utf8,JDK_JAVA_OPTIONS=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8,SPRING_CLOUD_CONFIG_ENABLED=false,SPRING_RABBITMQ_HOST=rabbitmq-service,SPRING_RABBITMQ_PORT=5672'
                    request:
                      memory: 1024Mi
                      cpu: 1.0
                    readinessProbeDelay: 1
                    readinessProbeTimeout: 5
                    livenessProbeDelay: 1
                    livenessProbeTimeout: 2
                    startupProbeDelay: 20
                    startupProbeTimeout: 5
                    startupProbeFailure: 50
    management:
      defaults:
        metrics:
          export:
            enabled: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: skipper-deployment
  labels:
    app: skipper
spec:
  replicas: 1
  selector:
    matchLabels:
      app: skipper
  template:
    metadata:
      labels:
        app: skipper
    spec:
      containers:
      - name: skipper
        image: springcloud/spring-cloud-skipper-server:2.11.0-SNAPSHOT
        volumeMounts:
          - name: config
            mountPath: /workspace/config
            readOnly: true
        ports:
        - containerPort: 7577
          name: http-skipper
        - containerPort: 8000
          name: jdwp-skipper
        livenessProbe:
          httpGet:
            path: /actuator/health
            port: 7577
          initialDelaySeconds: 1
        readinessProbe:
          httpGet:
            path: /actuator/info
            port: 7577
          initialDelaySeconds: 1
        startupProbe:
          tcpSocket:
            port: 7577
          failureThreshold: 10
          timeoutSeconds: 2
          initialDelaySeconds: 50
          periodSeconds: 3
        resources:
          requests:
            cpu: 0.5
            memory: 640Mi
        env:
          - name: LANG
            value: en_US.utf8
          - name: LC_ALL
            value: en_US.utf8
          - name: JDK_JAVA_OPTIONS
            value: '-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8'
          - name: SPRING_PROFILES_ACTIVE
            value: 'kubernetes'
          - name: SERVER_PORT
            value: '7577'
          - name: SPRING_CLOUD_CONFIG_ENABLED
            value: 'false'
          - name: SPRING_CLOUD_KUBERNETES_CONFIG_ENABLE_API
            value: 'false'
          - name: SPRING_CLOUD_KUBERNETES_SECRETS_ENABLE_API
            value: 'false'
          - name: SPRING_CLOUD_KUBERNETES_SECRETS_PATHS
            value: /etc/secrets
          - name: SPRING_DATASOURCE_USERNAME
            valueFrom:
              secretKeyRef:
                key: postgres-user
                name: postgres-secret
          - name: SPRING_DATASOURCE_PASSWORD
            valueFrom:
              secretKeyRef:
                key: postgres-password
                name: postgres-secret
          - name: SPRING_DATASOURCE_URL
            value: 'jdbc:postgresql://postgres-service:5432/spring_skipper'
          - name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
            value: 'org.postgresql.Driver'
          - name: SPRING_DATASOURCE_TEST_ON_BORROW
            value: 'true'
          - name: SPRING_DATASOURCE_VALIDATION_QUERY
            value: 'SELECT 1'
          - name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_USE_UNICODE
            value: 'true'
          - name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_CHARACTER_ENCODING
            value: 'UTF-8'
      volumes:
        - name: config
          configMap:
            name: skipper-configmap
            items:
            - key: 'application-kubernetes.yaml'
              path: 'application-kubernetes.yaml'
---
apiVersion: v1
kind: Service
metadata:
  name: skipper-service
spec:
  selector:
    app: skipper
  ports:
  - protocol: TCP
    name: skipper-http
    port: 7577
    targetPort: 7577
  - protocol: TCP
    name: skipper-jdwp
    port: 8000
    targetPort: 8000

Spring Cloud DataFlow Server

apiVersion: v1
kind: ConfigMap
metadata:
  name: scdf-configmap
  labels:
    app: scdf
data:
  application-kubernetes.yaml: |-
    logging:
      level:
        root: info
        org.springframework: debug
    management:
      defaults:
        metrics:
          export:
            enabled: false
    spring:
      output:
        ansi:
          enabled: NEVER
      cloud:
        deployer:
          kubernetes:
            imagePullPolicy: Always
            imagePullSecret: regcred
        dataflow:
          metrics.dashboard:
            url: 'http://scdf.bluesky.local'
          task:
            platform:
              kubernetes:
                accounts:
                  default:
                    imagePullPolicy: Always
                    imagePullSecret: regcred
                    limits:
                      memory: 1024Mi
    maven:
      remote-repositories:
        repo1:
          url: http://nexus-service:8082/repository/maven-public
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: scdf-deployment
  labels:
    app: scdf
spec:
  selector:
    matchLabels:
      app: scdf
  replicas: 1
  template:
    metadata:
      labels:
        app: scdf
    spec:
      containers:
        - name: scdf
          image: springcloud/spring-cloud-dataflow-server:2.10.4-SNAPSHOT
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: config
              mountPath: /workspace/config
              readOnly: true
          ports:
            - containerPort: 9393
              name: http
          livenessProbe:
            httpGet:
              path: /management/health
              port: 9393
            initialDelaySeconds: 1
          readinessProbe:
            httpGet:
              path: /management/info
              port: 9393
            initialDelaySeconds: 1
          startupProbe:
            tcpSocket:
              port: 9393
            failureThreshold: 10
            initialDelaySeconds: 50
            timeoutSeconds: 2
            periodSeconds: 3
          resources:
            requests:
              cpu: 0.5
              memory: 1024Mi
          env:
            - name: LANG
              value: en_US.utf8
            - name: LC_ALL
              value: en_US.utf8
            - name: JDK_JAVA_OPTIONS
              value: '-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8'
            - name: KUBERNETES_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: "metadata.namespace"
            - name: SPRING_PROFILES_ACTIVE
              value: 'kubernetes'
            - name: SERVER_PORT
              value: '9393'
            - name: SPRING_CLOUD_CONFIG_ENABLED
              value: 'false'
            - name: SPRING_CLOUD_DATAFLOW_FEATURES_ANALYTICS_ENABLED
              value: 'true'
            - name: SPRING_CLOUD_DATAFLOW_FEATURES_SCHEDULES_ENABLED
              value: 'true'
            - name: SPRING_CLOUD_DATAFLOW_TASK_COMPOSEDTASKRUNNER_URI
              value: 'docker://springcloud/spring-cloud-dataflow-composed-task-runner:2.10.4-SNAPSHOT'
            - name: SPRING_CLOUD_KUBERNETES_CONFIG_ENABLE_API
              value: 'false'
            - name: SPRING_CLOUD_KUBERNETES_SECRETS_ENABLE_API
              value: 'false'
            - name: SPRING_CLOUD_KUBERNETES_SECRETS_PATHS
              value: /etc/secrets
            - name: SPRING_CLOUD_DATAFLOW_SERVER_URI
              value: 'http://scdf-service:9393'
              # Provide the Skipper service location
            - name: SPRING_CLOUD_SKIPPER_CLIENT_SERVER_URI
              value: 'http://skipper-service:7577/api'
              # Add Maven repo for metadata artifact resolution for all stream apps
            - name: SPRING_APPLICATION_JSON
              value: "{ \"maven\": { \"local-repository\": null, \"remote-repositories\": { \"central\": { \"url\": \"https://repo.maven.apache.org/maven2\" }, \"repo1\": { \"url\": \"https://repo.spring.io/snapshot\"} } } }"
            - name: SPRING_DATASOURCE_USERNAME
              valueFrom:
                secretKeyRef:
                  key: postgres-user
                  name: postgres-secret
            - name: SPRING_DATASOURCE_PASSWORD
              valueFrom:
                secretKeyRef:
                  key: postgres-password
                  name: postgres-secret
            - name: SPRING_DATASOURCE_URL
              value: 'jdbc:postgresql://postgres-service:5432/spring_skipper'
            - name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
              value: 'org.postgresql.Driver'
            - name: SPRING_DATASOURCE_TEST_ON_BORROW
              value: 'true'
            - name: SPRING_DATASOURCE_VALIDATION_QUERY
              value: 'SELECT 1'
            - name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_USE_UNICODE
              value: 'true'
            - name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_CHARACTER_ENCODING
              value: 'UTF-8'
      volumes:
        - name: config
          configMap:
            name: scdf-configmap
            items:
              - key: 'application-kubernetes.yaml'
                path: 'application-kubernetes.yaml'
---
kind: Service
apiVersion: v1
metadata:
  name: scdf-service
  labels:
    app: scdf
spec:
  selector:
    app: scdf
  ports:
    - port: 9393
      name: scdf-server
반응형