Spring Cloud DataFlow 소개
Spring Cloud DataFlow
는 data-processing use case에 중점을 두고 application 개발과 배포를 간소화한다.
이를 위해 여러 Spring의 여러 라이브러리가 조합되어 기능을 제공한다.
간략하게 소개하면 Spring Cloud DataFlow
는 Spring Integration
과 Spring Batch
로 작성된 Spring Application을 등록하고 Cloud Platform 배포를 관리하는 도구이다.
Spring Cloud Dataflow는 두 가지 방식의 data-processing을 처리한다.
Streams - CDC (Change Data Capture)의 실시간 또는 준 실시간 데이터 처리 작업
Spring Integration
을 생각하면 되며 이를 사용한 Spring Cloud Stream
application의 등록 및 배포를 관리한다.
데이터를 가져오고 처리하고 다시 내보내는 과정이 각각 독립적인 application이며 각 단계 사이에 rabbitmq
, kafka
같은 message brocker를 사용하여 데이터를 전달하게 된다.
각 application은 계속 떠 있으며 지속적으로 데이터를 가져와서 가공하고 내보낸다.
각 단계는 Source
, Processor
, Sink
로 부르며 Spring Cloud DataFlow
에서는 각각 개별 application으로 등록하고 이를 연결하여 사용한다.
Task/Jobs - ETL (Extract, Transform, Load)의 일괄 처리 배치 (task) 작업
Spring Batch
를 생각하면 되며 이를 사용한 Spring Cloud Task
application의 등록 및 배포를 관리한다.
batch application이 실행되면 한 번에 대량의 데이터를 가져와 처리하고 내보낸 후 application이 종료되는 형태로 주기적으로 해당 application을 실행하는 형식이다.
batch는 ItemReader
, ItemProcessor
, ItemWriter
로 구성되며 Spring Cloud DataFlow
에서는 Task application으로 등록하여 사용한다.
사용 방법에 대한 간단한 소개
Spring Cloud DataFlow의 설치가 복잡하므로 설치를 소개하기 전에 간단하게 Spring Cloud DataFlow가 어떤 것인지 간략하게 소개해본다.
설치가 된 Spring Cloud DataFlow의 Dashboard를 살펴보면 다음과 같다.
4개의 메뉴가 있는데 간략하게 설명하면 다음과 같다.
Application
: 사용할 Spring Applicaiton 등록 (Stream, Task application을 등록)Stream
: stream 생성 및 배포 관리Tasks / Jobs
: Task 생성 및 배포 관리Manage
: 관리 메뉴
Application 등록
data processing 방식을 사용하기 위해 아래와 같은 Type의 Application을 등록하게 된다.
- Streams
Source
Processor
Sink
- Tasks / Jobs
Task
Applicaton 메뉴에서 Add Application을 선택하고 상황에 맞는 Application을 등록한다.
빌드된 Docker Image
또는 Maven jar
를 등록하여 사용할 수 있다.
(Cloud Platform 배포를 위해 사용하므로 Docker Image를 사용하면 좋을 듯싶다.)
등록 시 Source
, Processor
, Sink
, Task
같은 Type을 지정하여 등록하게 된다.
Application이 등록되면 등록된 항목을 확인할 수 있다.
이렇게 등록된 Application은 Streams
, Task / Jobs
메뉴에서 생성 시 지원하는 Type에 대한 Application을 선택할 수 있게 노출이 된다.
Stream
, Tasks / Jobs
두 가지 방식의 Spring Application 이외의 Application도 App
type으로 등록하여 사용할 수 있다.
Stream 생성 및 배포해 보기
예를 들어 Stream 메뉴에서 create stream을 선택하면
다음과 같이 Stream
생성 관련 항목을 선택할 수 있게 표시된다.
각 단계에 맞는 application을 선택하고 우측으로 드래그하여 배치하고 각 application 사이를 연결하면 된다.
Source
나 Sink
는 필수이고 중간 과정인 Processor
는 사용하지 않을 수도 있다.
우측 화면에서 사용하고자 하는 Source
, Processor
, Sink
를 배치한 후 각 단계 사이를 연결해 주고 하단의 Create Stream
을 선택하면 생성할 Stream의 이름을 지정하는 레이어가 뜬다.
생성을 하면 목록에 생성한 Stream이 표시된다.
해당 Stream을 선택하면 해당 Stream에 대한 정보와 배포 버튼을 확인할 수 있다.
상단의 Deploy Stream
버튼을 누르면 배포할 환경과 배포 시 설정할 값들을 추가할 수 있는 항목이 뜬다.
배포를 하면 설정한 cloud platform에 배포가 된다.
bluesky-study-scdf-bluesky-study-scdf-source-http-v22-86d8w6wgw 1/1 Running 0 39s
bluesky-study-scdf-bluesky-study-scdf-sink-log-v22-568597chzdg9 1/1 Running 0 39s
Stream의 경우 앞서 말한 것처럼 Source
, Processor
, Sink
가 각각 개별 application으로 구성되기 때문에 pod도 각각 구성된다.
하지만 Spring Cloud DataFlow
에서는 설정된 하나의 Stream에서 Source
, Processor
, Sink
에 대한 배포를 묶어서 관리한다.
배포를 하고 나면 Stream 정보를 다시 확인하면 다음과 같이 변경된 것을 확인할 수 있다.
배포 설정을 변경하거나 배포된 pod의 수를 변경하거나 배포를 다시 취소할 수 있으며 각각의 pod의 로그를 확인할 수 있다.
그보다 더 아래쪽에는 배포 히스토리를 확인할 수 있고 이전 버전의 배포로 롤백을 할 수도 있다.
또한 Stream의 Runtime 항목에서 현재 배포된 Stream이 무엇이 있는지 확인할 수 있다.
Stream 구성 추가 설명
하나의 Stream을 등록하고 배포하는 것까지 살펴보았는데 만약 Stream을 생성할 때 Source
, Processor
, Sink
중 동일 타입을 분기하여 2개 이상 설정하면 분기된 만큼 별도의 Stream이 생성된다.
분기된 Stream에 대해서는 Create Stream
시 이름을 각각 지정하는 입력창이 뜬다.
예를 들어 아래처럼 Stream을 구성하면
Create Stream 시 2개의 Stream의 이름을 지정하라는 레이어가 뜬다.
즉 Source
, Processor
, Sink
하나씩이 하나의 Stream 묶음이며 분기가 있으면 Spring Cloud DataFlow는 별도의 Stream을 생성하여 등록한다.
등록 Application의 Spring Cloud DataFlow 설정
application을 등록하고 Stream을 생성하여 배포하는 것까지 Spring Cloud DataFlow가 처리하는 것을 훑어보았다.
등록할 대상 Application을 어떻게 만들어야 하는지에 대한 설명은 하지 않았었다.
Spring Cloud DataFlow를 사용하지 않고 Spring Integration만 사용한다면 Source
, Processor
, Sink
각각 application의 기능을 구성하고 나면 그 사이에 데이터를 주고받기 위한 Message Broker 구성을 설정해야 한다.
Spring Cloud DataFlow를 사용하면 Message Broker에 대한 설정이 간소화된다.
Source의 경우를 예를 들면 다음처럼 Supplier bean을 등록하고
@Slf4j
@Configuration
public class SourceConfig {
@Bean
Supplier<String> message() {
log.debug("source called");
return () -> "dfd";
}
}
해당 이름의 output 설정을 properties에 하면 된다.
# message is supplier function name
spring.cloud.stream.function.bindings.message-out-0=output
message
부분은 등록된 Supplier bean의 이름이고 out
을 한다는 의미의 중간값, index
를 의미하는 마지막 값으로 구성된 이름으로 Source
에는 output에 대한 설정이 필요하고 Processor
에는 input과 output, Sink
에는 input에 대한 설정이 필요하며 각각 요청을 주고받을 수 있도록 값을 맞춰주어야 한다.
Source
는 Supplier
bean을, Processor
는 Function
bean을, Sink
는 Consumer
bean을 등록하고 각각 properties의 설정을 하면 된다.
이렇게 구성된 message 설정에 대해 message broker에 알아서 생성해 주고 데이터를 주고받게 처리를 해준다.
Prometheus 및 Grafana Metric 지원
Spring Cloud DataFlow를 통해 배포한 app들에 대한 Prometheus
metric 수집을 지원하여 이를 Grafana
를 통해 확인할 수 있다.
Spring Cloud DataFlow 설치
Spring Cloud DataFlow를 구성하기 위해 사용하는 것들은 다음과 같다.
- message broker : kafka 또는 rabbitmq
- database : mariadb 또는 postgresql
- Spring Cloud Skipper (with Spring Cloud Deployer)
- Spring Cloud DataFlow Server
- (optional) prometheus, grafana
따라서 Spring Cloud DataFlow를 사용하기 위해서는 사전에 message broker와 database가 설치가 되어 있어야 하고 Spring Cloud Skipper와 Spring Cloud DataFlow Server를 설치해야 한다.
설치에 관해서는 Spring Cloud DataFlow 문서를 참고하면 된다.
https://dataflow.spring.io/docs/installation/
https://dataflow.spring.io/docs/installation/kubernetes/kubectl/
다만 현재 기준의 문서의 가이드 대로 해보니 설치가 진행되지 않았다.
github에 kubernetes의 설치 관련 yaml을 제공해 주는데 main branch에는 관련 파일이 없기 때문에 2.10.x branch를 참고하였다.
https://github.com/spring-cloud/spring-cloud-dataflow/blob/2.10.x/src/local/k8s/yaml
설치를 진행하면서 겪은 문제들은 다음과 같다.
DB 버전 호환성 체크 필요
우선 database의 경우 Spring Cloud Skipper와 Spring Cloud DataFlow Server에서 사용한다.
Spring Cloud Skipper나 Spring Cloud DataFlow Server가 내부적으로 Flyway
를 사용하는데 지원하는 MariaDB
버전과 맞지 않으면 사용할 수 없었다.
https://flywaydb.org/documentation/database/mariadb
문서상으론 10.2부터 10.6까지 지원한다고 되어있다.
Cloud Platform 호환성 체크 필요
Spring Cloud Skipper가 Spring Cloud Deployer를 통해 배포를 하기 때문에 지원하는 버전이 맞아야 한다.
https://github.com/spring-cloud/spring-cloud-deployer-kubernetes
기존 등록된 secret을 serviceaccount에 imagePullSecrets으로 추가해야 함
https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/
kubernetes 배포 관련하여 권한 처리를 위해 imagePullSecrets
설정을 해야 한다.
예를 들어 다음과 같은 Secret을 등록하여 private registry에서 pull image를 사용하고 있었다면
kubectl create secret docker-registry regcred --docker-server=<your-registry-server> --docker-username=<your-name> --docker-password=<your-pword> --docker-email=<your-email>
해당 secret을 skipper에 설정한 serviceaccount에 사용하도록 추가해야 한다.
아래 명령으로 에디터를 열고
kubectl edit serviceaccount/default
관련 설정을 추가해야 한다.
imagePullSecrets:
- name: regcred
설치 관련 kubernetes yaml 파일
kubernetes 설치 관련 yaml 들은 다음과 같다.
pv, pvc나 secret, ingress 설정은 생략하였다.
4가지 항목의 설치에 대한 참고 정도로 보면 될 것 같다.
rabbitmq
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq-deployment
labels:
app: rabbitmq
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3-management
ports:
- containerPort: 5672
- containerPort: 15672
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumes:
- name: rabbitmq-data
persistentVolumeClaim:
claimName: rabbitmq-data-pvc
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-service
spec:
selector:
app: rabbitmq
ports:
- protocol: TCP
name: rabbitmq-port
port: 5672
targetPort: 5672
- protocol: TCP
name: rabbitmqmanagement-port
port: 15672
targetPort: 15672
postgres
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres-deployment
labels:
app: postgres
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:14.8
ports:
- containerPort: 5432
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
key: postgres-user
name: postgres-secret
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
key: postgres-password
name: postgres-secret
volumeMounts:
- name: postgres-volume
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-volume
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
spec:
selector:
app: postgres
ports:
- protocol: TCP
port: 5432
targetPort: 5432
type: LoadBalancer
Spring Cloud Skipper
apiVersion: v1
kind: ConfigMap
metadata:
name: skipper-configmap
labels:
app: skipper
data:
application-kubernetes.yaml: |-
logging:
level:
root: info
org.springframework.cloud: debug
io.fabric8: debug
spring:
output:
ansi:
enabled: NEVER
cloud:
deployer:
kubernetes:
imagePullPolicy: Always
imagePullSecret:
-name: regcred
skipper:
server:
platform:
kubernetes:
accounts:
default:
imagePullPolicy: Always
imagePullSecret:
-name: regcred
environmentVariables: 'LANG=en_US.utf8,LC_ALL=en_US.utf8,JDK_JAVA_OPTIONS=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8,SPRING_CLOUD_CONFIG_ENABLED=false,SPRING_RABBITMQ_HOST=rabbitmq-service,SPRING_RABBITMQ_PORT=5672'
request:
memory: 1024Mi
cpu: 1.0
readinessProbeDelay: 1
readinessProbeTimeout: 5
livenessProbeDelay: 1
livenessProbeTimeout: 2
startupProbeDelay: 20
startupProbeTimeout: 5
startupProbeFailure: 50
management:
defaults:
metrics:
export:
enabled: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: skipper-deployment
labels:
app: skipper
spec:
replicas: 1
selector:
matchLabels:
app: skipper
template:
metadata:
labels:
app: skipper
spec:
containers:
- name: skipper
image: springcloud/spring-cloud-skipper-server:2.11.0-SNAPSHOT
volumeMounts:
- name: config
mountPath: /workspace/config
readOnly: true
ports:
- containerPort: 7577
name: http-skipper
- containerPort: 8000
name: jdwp-skipper
livenessProbe:
httpGet:
path: /actuator/health
port: 7577
initialDelaySeconds: 1
readinessProbe:
httpGet:
path: /actuator/info
port: 7577
initialDelaySeconds: 1
startupProbe:
tcpSocket:
port: 7577
failureThreshold: 10
timeoutSeconds: 2
initialDelaySeconds: 50
periodSeconds: 3
resources:
requests:
cpu: 0.5
memory: 640Mi
env:
- name: LANG
value: en_US.utf8
- name: LC_ALL
value: en_US.utf8
- name: JDK_JAVA_OPTIONS
value: '-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8'
- name: SPRING_PROFILES_ACTIVE
value: 'kubernetes'
- name: SERVER_PORT
value: '7577'
- name: SPRING_CLOUD_CONFIG_ENABLED
value: 'false'
- name: SPRING_CLOUD_KUBERNETES_CONFIG_ENABLE_API
value: 'false'
- name: SPRING_CLOUD_KUBERNETES_SECRETS_ENABLE_API
value: 'false'
- name: SPRING_CLOUD_KUBERNETES_SECRETS_PATHS
value: /etc/secrets
- name: SPRING_DATASOURCE_USERNAME
valueFrom:
secretKeyRef:
key: postgres-user
name: postgres-secret
- name: SPRING_DATASOURCE_PASSWORD
valueFrom:
secretKeyRef:
key: postgres-password
name: postgres-secret
- name: SPRING_DATASOURCE_URL
value: 'jdbc:postgresql://postgres-service:5432/spring_skipper'
- name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
value: 'org.postgresql.Driver'
- name: SPRING_DATASOURCE_TEST_ON_BORROW
value: 'true'
- name: SPRING_DATASOURCE_VALIDATION_QUERY
value: 'SELECT 1'
- name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_USE_UNICODE
value: 'true'
- name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_CHARACTER_ENCODING
value: 'UTF-8'
volumes:
- name: config
configMap:
name: skipper-configmap
items:
- key: 'application-kubernetes.yaml'
path: 'application-kubernetes.yaml'
---
apiVersion: v1
kind: Service
metadata:
name: skipper-service
spec:
selector:
app: skipper
ports:
- protocol: TCP
name: skipper-http
port: 7577
targetPort: 7577
- protocol: TCP
name: skipper-jdwp
port: 8000
targetPort: 8000
Spring Cloud DataFlow Server
apiVersion: v1
kind: ConfigMap
metadata:
name: scdf-configmap
labels:
app: scdf
data:
application-kubernetes.yaml: |-
logging:
level:
root: info
org.springframework: debug
management:
defaults:
metrics:
export:
enabled: false
spring:
output:
ansi:
enabled: NEVER
cloud:
deployer:
kubernetes:
imagePullPolicy: Always
imagePullSecret: regcred
dataflow:
metrics.dashboard:
url: 'http://scdf.bluesky.local'
task:
platform:
kubernetes:
accounts:
default:
imagePullPolicy: Always
imagePullSecret: regcred
limits:
memory: 1024Mi
maven:
remote-repositories:
repo1:
url: http://nexus-service:8082/repository/maven-public
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: scdf-deployment
labels:
app: scdf
spec:
selector:
matchLabels:
app: scdf
replicas: 1
template:
metadata:
labels:
app: scdf
spec:
containers:
- name: scdf
image: springcloud/spring-cloud-dataflow-server:2.10.4-SNAPSHOT
imagePullPolicy: IfNotPresent
volumeMounts:
- name: config
mountPath: /workspace/config
readOnly: true
ports:
- containerPort: 9393
name: http
livenessProbe:
httpGet:
path: /management/health
port: 9393
initialDelaySeconds: 1
readinessProbe:
httpGet:
path: /management/info
port: 9393
initialDelaySeconds: 1
startupProbe:
tcpSocket:
port: 9393
failureThreshold: 10
initialDelaySeconds: 50
timeoutSeconds: 2
periodSeconds: 3
resources:
requests:
cpu: 0.5
memory: 1024Mi
env:
- name: LANG
value: en_US.utf8
- name: LC_ALL
value: en_US.utf8
- name: JDK_JAVA_OPTIONS
value: '-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8'
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: "metadata.namespace"
- name: SPRING_PROFILES_ACTIVE
value: 'kubernetes'
- name: SERVER_PORT
value: '9393'
- name: SPRING_CLOUD_CONFIG_ENABLED
value: 'false'
- name: SPRING_CLOUD_DATAFLOW_FEATURES_ANALYTICS_ENABLED
value: 'true'
- name: SPRING_CLOUD_DATAFLOW_FEATURES_SCHEDULES_ENABLED
value: 'true'
- name: SPRING_CLOUD_DATAFLOW_TASK_COMPOSEDTASKRUNNER_URI
value: 'docker://springcloud/spring-cloud-dataflow-composed-task-runner:2.10.4-SNAPSHOT'
- name: SPRING_CLOUD_KUBERNETES_CONFIG_ENABLE_API
value: 'false'
- name: SPRING_CLOUD_KUBERNETES_SECRETS_ENABLE_API
value: 'false'
- name: SPRING_CLOUD_KUBERNETES_SECRETS_PATHS
value: /etc/secrets
- name: SPRING_CLOUD_DATAFLOW_SERVER_URI
value: 'http://scdf-service:9393'
# Provide the Skipper service location
- name: SPRING_CLOUD_SKIPPER_CLIENT_SERVER_URI
value: 'http://skipper-service:7577/api'
# Add Maven repo for metadata artifact resolution for all stream apps
- name: SPRING_APPLICATION_JSON
value: "{ \"maven\": { \"local-repository\": null, \"remote-repositories\": { \"central\": { \"url\": \"https://repo.maven.apache.org/maven2\" }, \"repo1\": { \"url\": \"https://repo.spring.io/snapshot\"} } } }"
- name: SPRING_DATASOURCE_USERNAME
valueFrom:
secretKeyRef:
key: postgres-user
name: postgres-secret
- name: SPRING_DATASOURCE_PASSWORD
valueFrom:
secretKeyRef:
key: postgres-password
name: postgres-secret
- name: SPRING_DATASOURCE_URL
value: 'jdbc:postgresql://postgres-service:5432/spring_skipper'
- name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
value: 'org.postgresql.Driver'
- name: SPRING_DATASOURCE_TEST_ON_BORROW
value: 'true'
- name: SPRING_DATASOURCE_VALIDATION_QUERY
value: 'SELECT 1'
- name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_USE_UNICODE
value: 'true'
- name: SPRING_DATASOURCE_HIKARI_DATA_SOURCE_PROPERTIES_CHARACTER_ENCODING
value: 'UTF-8'
volumes:
- name: config
configMap:
name: scdf-configmap
items:
- key: 'application-kubernetes.yaml'
path: 'application-kubernetes.yaml'
---
kind: Service
apiVersion: v1
metadata:
name: scdf-service
labels:
app: scdf
spec:
selector:
app: scdf
ports:
- port: 9393
name: scdf-server
'Study > Java' 카테고리의 다른 글
[troubleshooting] Spring Boot 테스트 코드 실행 시 java.lang.NoClassDefFoundError: io/micrometer/context/ThreadLocalAccessor 에러 (0) | 2023.08.18 |
---|---|
annotation 을 가진 class 검색에 reflection util 대신 Spring TypeFilter 사용으로 대체하기 (0) | 2023.07.31 |
Spring Boot Condition Evaluation Report 확인하기 (0) | 2023.07.16 |
STS Newer patch version of Spring Boot available warning disable 처리하기 (0) | 2023.07.10 |
Jackson ObjectMapper 특정 요청에 대해서만 jsonIgnore 처리하여 응답하기 (0) | 2023.07.05 |
[troubleshooting] Spring Boot 3.1.0에서 hibernate cannot be cast 오류 발생 (0) | 2023.06.21 |
STS 4.19.0 Release 소식 및 Windows 11에서 Windows Defender 예외 처리 하기 (0) | 2023.06.16 |
Spring JDBC AbstractRoutingDataSource, DelegatingDataSource 사용해 보기 (0) | 2023.06.15 |
Spring Boot 프로젝트 properties 암복호화 처리 구현하기 (1) | 2023.06.11 |
Spring Boot 3.1 Release Notes (0) | 2023.05.26 |