예제 및 사용 사례가 있는 Apache Spark 소개
게시 됨: 2022-03-11Spark가 작성되는 언어인 Scala에 관심을 갖게 된 2013년 말에 Spark에 대해 처음 들었습니다. 얼마 후 나는 타이타닉호의 생존을 예측하는 재미있는 데이터 과학 프로젝트를 했습니다. 이것은 Spark 개념과 프로그래밍에 대해 더 자세히 소개할 수 있는 좋은 방법임이 밝혀졌습니다. 시작할 곳을 찾고 있는 Spark 개발자 지망생에게 적극 권장합니다.
오늘날 Spark는 Amazon, eBay 및 Yahoo!와 같은 주요 업체에서 채택하고 있습니다. 많은 조직에서 수천 개의 노드가 있는 클러스터에서 Spark를 실행합니다. Spark FAQ에 따르면 알려진 가장 큰 클러스터에는 8000개 이상의 노드가 있습니다. 실제로 Spark는 주목할 가치가 있고 배울 가치가 있는 기술입니다.
이 문서에서는 사용 사례 및 예제를 포함하여 Spark에 대한 소개를 제공합니다. 여기에는 Apache Spark 웹 사이트의 정보와 Learning Spark - Lightning-Fast Big Data Analysis 책의 정보가 포함되어 있습니다.
아파치 스파크란? 소개
Spark는 "lightning fast cluster computing"으로 광고되는 Apache 프로젝트입니다. 그것은 번성하는 오픈 소스 커뮤니티를 가지고 있으며 현재 가장 활발한 Apache 프로젝트입니다.
Spark는 보다 빠르고 일반적인 데이터 처리 플랫폼을 제공합니다. Spark를 사용하면 Hadoop보다 메모리에서 최대 100배 더 빠르게 또는 디스크에서 10배 더 빠르게 프로그램을 실행할 수 있습니다. 작년에 Spark는 10분의 1의 머신 수에서 3배 더 빠른 100TB Daytona GraySort 대회를 완료하여 Hadoop을 인수했으며 또한 페타바이트를 분류하는 가장 빠른 오픈 소스 엔진이 되었습니다.
또한 Spark를 사용하면 80개 이상의 상위 수준 연산자를 마음대로 사용할 수 있으므로 코드를 더 빠르게 작성할 수 있습니다. 이를 설명하기 위해 "Hello World!"를 살펴보겠습니다. BigData: 단어 개수 예시. MapReduce를 위해 Java로 작성되었으며 약 50줄의 코드가 있는 반면 Spark(및 Scala)에서는 다음과 같이 간단하게 수행할 수 있습니다.
sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")
Apache Spark 사용 방법을 배울 때 또 다른 중요한 측면은 바로 사용할 수 있는 REPL(대화형 셸)입니다. REPL을 사용하면 먼저 전체 작업을 코딩하고 실행할 필요 없이 각 코드 라인의 결과를 테스트할 수 있습니다. 따라서 작업 코드에 대한 경로가 훨씬 짧고 임시 데이터 분석이 가능합니다.
Spark의 추가 주요 기능은 다음과 같습니다.
- 현재 Scala, Java 및 Python에서 API를 제공하며 다른 언어(예: R)에 대한 지원도 진행 중입니다.
- Hadoop 에코시스템 및 데이터 소스(HDFS, Amazon S3, Hive, HBase, Cassandra 등)와 잘 통합됩니다.
- Hadoop YARN 또는 Apache Mesos에서 관리하는 클러스터에서 실행할 수 있으며 독립 실행형으로도 실행할 수 있습니다.
Spark 코어는 동일한 애플리케이션에서 원활하게 사용할 수 있는 강력한 고급 라이브러리 세트로 보완됩니다. 이러한 라이브러리에는 현재 SparkSQL, Spark Streaming, MLlib(머신 러닝용) 및 GraphX가 포함되어 있으며 각각은 이 문서에서 더 자세히 설명합니다. 추가 Spark 라이브러리 및 확장도 현재 개발 중입니다.
스파크 코어
Spark Core는 대규모 병렬 및 분산 데이터 처리를 위한 기본 엔진입니다. 다음을 담당합니다.
- 메모리 관리 및 오류 복구
- 클러스터에서 작업 예약, 배포 및 모니터링
- 스토리지 시스템과 상호 작용
Spark는 병렬로 작동할 수 있는 불변의 내결함성 분산 개체 컬렉션인 RDD(Resilient Distributed Dataset)의 개념을 도입했습니다. RDD는 모든 유형의 개체를 포함할 수 있으며 외부 데이터 세트를 로드하거나 드라이버 프로그램에서 컬렉션을 배포하여 생성됩니다.
RDD는 두 가지 유형의 작업을 지원합니다.
- 변환은 RDD에서 수행되고 결과를 포함하는 새 RDD를 생성하는 작업(예: 맵, 필터, 조인, 통합 등)입니다.
- 작업은 RDD에서 계산을 실행한 후 값을 반환하는 작업(예: reduce, count, first 등)입니다.
Spark의 변환은 "지연"입니다. 즉, 결과를 즉시 계산하지 않습니다. 대신 수행할 작업과 작업이 수행될 데이터세트(예: 파일)를 "기억"합니다. 변환은 작업이 호출되고 결과가 드라이버 프로그램에 반환될 때만 실제로 계산됩니다. 이 설계를 통해 Spark를 보다 효율적으로 실행할 수 있습니다. 예를 들어 큰 파일이 다양한 방식으로 변환되어 첫 번째 작업에 전달되면 Spark는 전체 파일에 대한 작업을 수행하지 않고 첫 번째 줄에 대한 결과만 처리하고 반환합니다.
기본적으로 변환된 각 RDD는 작업을 실행할 때마다 다시 계산될 수 있습니다. 그러나 지속 또는 캐시 방법을 사용하여 메모리에 RDD를 유지할 수도 있습니다. 이 경우 Spark는 다음에 쿼리할 때 훨씬 빠른 액세스를 위해 클러스터에 요소를 유지합니다.
스파크SQL
SparkSQL은 SQL 또는 Hive 쿼리 언어를 통해 데이터 쿼리를 지원하는 Spark 구성 요소입니다. Spark 위에서 실행되는 Apache Hive 포트(MapReduce 대신)로 시작되었으며 이제 Spark 스택과 통합되었습니다. 다양한 데이터 소스에 대한 지원을 제공하는 것 외에도 코드 변환으로 SQL 쿼리를 구성할 수 있어 매우 강력한 도구가 됩니다. 다음은 Hive 호환 쿼리의 예입니다.
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
스파크 스트리밍
Spark Streaming은 프로덕션 웹 서버 로그 파일(예: Apache Flume 및 HDFS/S3), Twitter와 같은 소셜 미디어, Kafka와 같은 다양한 메시징 대기열과 같은 스트리밍 데이터의 실시간 처리를 지원합니다. 내부적으로 Spark Streaming은 입력 데이터 스트림을 수신하고 데이터를 일괄 처리로 나눕니다. 다음으로 Spark 엔진에 의해 처리되고 아래와 같이 배치로 최종 결과 스트림을 생성합니다.

Spark Streaming API는 Spark Core의 API와 거의 일치하므로 프로그래머가 일괄 처리 및 스트리밍 데이터의 세계에서 쉽게 작업할 수 있습니다.
MLlib
MLlib는 분류, 회귀, 클러스터링, 협업 필터링 등을 위해 클러스터에서 확장하도록 설계된 다양한 알고리즘을 제공하는 기계 학습 라이브러리입니다(해당 주제에 대한 자세한 내용은 기계 학습에 대한 Toptal의 기사 참조). 이러한 알고리즘 중 일부는 일반 최소 제곱 또는 k-평균 클러스터링(및 추가 예정)을 사용하는 선형 회귀와 같은 스트리밍 데이터에서도 작동합니다. Apache Mahout(Hadoop용 머신 러닝 라이브러리)은 이미 MapReduce에서 손을 떼고 Spark MLlib에 합류했습니다.
그래프X
GraphX는 그래프를 조작하고 그래프 병렬 작업을 수행하기 위한 라이브러리입니다. ETL, 탐색적 분석 및 반복적인 그래프 계산을 위한 균일한 도구를 제공합니다. 그래프 조작을 위한 내장 연산 외에도 PageRank와 같은 일반적인 그래프 알고리즘 라이브러리를 제공합니다.
Apache Spark 사용 방법: 이벤트 감지 사용 사례
이제 “Apache Spark란 무엇입니까?”라는 질문에 답했으므로 가장 효과적으로 사용할 수 있는 문제 또는 과제는 무엇인지 생각해 보겠습니다.
최근 트위터 스트림을 분석하여 지진을 감지하는 실험에 관한 기사를 접했습니다. 흥미롭게도 이 기술은 일본 기상청보다 더 빨리 일본의 지진을 알릴 가능성이 있는 것으로 나타났습니다. 비록 그들이 그들의 기사에서 다른 기술을 사용했지만, 나는 우리가 어떻게 Spark를 단순화된 코드 조각과 함께 사용하고 글루 코드 없이 사용할 수 있는지 보여주는 좋은 예라고 생각합니다.
먼저 "지진" 또는 "흔들림"과 같은 관련성이 있는 것으로 보이는 트윗을 필터링해야 합니다. 다음과 같은 목적으로 Spark Streaming을 쉽게 사용할 수 있습니다.
TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
그런 다음 트윗에 대해 의미론적 분석을 실행하여 현재 지진 발생을 참조하는 것으로 보이는지 확인해야 합니다. "지진!"과 같은 트윗 예를 들어 "지금 흔들리고 있습니다"는 긍정적인 일치로 간주되지만 "지진 회의에 참석" 또는 "어제 지진은 무서웠어요"와 같은 트윗은 그렇지 않습니다. 이 논문의 저자는 이를 위해 서포트 벡터 머신(SVM)을 사용했습니다. 여기에서도 똑같이 하겠지만 스트리밍 버전을 사용해 볼 수도 있습니다. MLlib의 결과 코드 예제는 다음과 같습니다.
// We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)
모델의 예측률에 만족하면 다음 단계로 넘어갈 수 있고 지진을 발견할 때마다 대응할 수 있습니다. 하나를 감지하려면 정의된 시간 창에서 긍정적인 트윗의 특정 수(즉, 밀도)가 필요합니다(문서에 설명됨). Twitter 위치 서비스가 활성화된 트윗의 경우 지진 위치도 추출합니다. 이 지식으로 무장하면 SparkSQL을 사용하고 기존 Hive 테이블(지진 알림 수신에 관심이 있는 사용자 저장)을 쿼리하여 다음과 같이 이메일 주소를 검색하고 개인화된 경고 이메일을 보낼 수 있습니다.
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
기타 Apache Spark 사용 사례
Spark의 잠재적 사용 사례는 물론 지진 감지 이상으로 확장됩니다.
다음은 Spark가 매우 적합한 빅 데이터의 속도, 다양성 및 볼륨을 처리해야 하는 다른 사용 사례에 대한 빠른(그러나 완전한 것은 아닙니다!) 샘플링입니다.
게임 산업에서 실시간 게임 내 이벤트의 잠재적 파이어호스에서 패턴을 처리 및 발견하고 즉시 대응할 수 있는 것은 플레이어 유지, 타겟 광고, 자동 -복잡성 수준 조정 등.
전자 상거래 산업에서 실시간 거래 정보는 k-means와 같은 스트리밍 클러스터링 알고리즘 또는 ALS와 같은 협업 필터링으로 전달될 수 있습니다. 그런 다음 결과는 고객 의견이나 제품 리뷰와 같은 다른 비정형 데이터 소스와 결합될 수 있으며, 시간이 지남에 따라 새로운 트렌드에 따라 권장 사항을 지속적으로 개선하고 적용하는 데 사용할 수 있습니다.
금융 또는 보안 산업에서 Spark 스택은 사기 또는 침입 탐지 시스템 또는 위험 기반 인증에 적용될 수 있습니다. 방대한 양의 보관된 로그를 수집하고 데이터 유출 및 손상된 계정에 대한 정보(예: https://haveibeenpwned.com/ 참조) 및 연결/정보와 같은 외부 데이터 소스와 결합하여 최고의 결과를 얻을 수 있습니다. IP 지리적 위치 또는 시간과 같은 요청.
결론
요약하자면, Spark는 정형 및 비정형의 실시간 또는 보관된 데이터를 대량으로 처리하는 어렵고 계산 집약적인 작업을 단순화하고 기계 학습 및 그래프 알고리즘과 같은 관련 복잡한 기능을 원활하게 통합하는 데 도움이 됩니다. Spark는 대중에게 빅 데이터 처리를 제공합니다. 확인 해봐!