Apache Arrow是是各種大數(shù)據(jù)工具(包括BigQuery)使用的一種流行格式,它是平面和分層數(shù)據(jù)的存儲格式。它是一種加快應(yīng)用程序內(nèi)存密集型。
數(shù)據(jù)處理和數(shù)據(jù)科學(xué)領(lǐng)域中的常用庫: Apache Arrow 。諸如Apache Parquet,Apache Spark,pandas之類的開放源代碼項(xiàng)目以及許多商業(yè)或封閉源代碼服務(wù)都使用Arrow。它提供以下功能:
- 內(nèi)存計(jì)算
- 標(biāo)準(zhǔn)化的柱狀存儲格式
- 一個(gè)IPC和RPC框架,分別用于進(jìn)程和節(jié)點(diǎn)之間的數(shù)據(jù)交換
讓我們看一看在Arrow出現(xiàn)之前事物是如何工作的:
我們可以看到,為了使Spark從Parquet文件中讀取數(shù)據(jù),我們需要以Parquet格式讀取和反序列化數(shù)據(jù)。這要求我們通過將數(shù)據(jù)加載到內(nèi)存中來制作數(shù)據(jù)的完整副本。首先,我們將數(shù)據(jù)讀入內(nèi)存緩沖區(qū),然后使用Parquet的轉(zhuǎn)換方法將數(shù)據(jù)(例如字符串或數(shù)字)轉(zhuǎn)換為我們的編程語言的表示形式。這是必需的,因?yàn)镻arquet表示的數(shù)字與Python編程語言表示的數(shù)字不同。
由于許多原因,這對于性能來說是一個(gè)很大的問題:
- 我們正在復(fù)制數(shù)據(jù)并在其上運(yùn)行轉(zhuǎn)換步驟。數(shù)據(jù)的格式不同,我們需要對所有數(shù)據(jù)進(jìn)行讀取和轉(zhuǎn)換,然后再對數(shù)據(jù)進(jìn)行任何計(jì)算。
- 我們正在加載的數(shù)據(jù)必須放入內(nèi)存中。您只有8GB的RAM,數(shù)據(jù)是10GB嗎?你真倒霉!
現(xiàn)在,讓我們看一下Apache Arrow如何改進(jìn)這一點(diǎn):
Arrow無需復(fù)制和轉(zhuǎn)換數(shù)據(jù),而是了解如何直接讀取和操作數(shù)據(jù)。為此,Arrow社區(qū)定義了一種新的文件格式以及直接對序列化數(shù)據(jù)起作用的操作。可以直接從磁盤讀取此數(shù)據(jù)格式,而無需將其加載到內(nèi)存中并轉(zhuǎn)換/反序列化數(shù)據(jù)。當(dāng)然,部分?jǐn)?shù)據(jù)仍將被加載到RAM中,但您的數(shù)據(jù)不必放入內(nèi)存中。Arrow使用其文件的內(nèi)存映射功能,僅在必要和可能的情況下將盡可能多的數(shù)據(jù)加載到內(nèi)存中。
Apache Arrow支持以下語言:
- C++
- C#
- Go
- Java
- JavaScript
- Rust
- Python (through the C++ library)
- Ruby (through the C++ library)
- R (through the C++ library)
- MATLAB (through the C++ library).
Arrow特點(diǎn)
Arrow首先是提供用于內(nèi)存計(jì)算的列式數(shù)據(jù)結(jié)構(gòu)的庫,可以將任何數(shù)據(jù)解壓縮并解碼為Arrow柱狀數(shù)據(jù)結(jié)構(gòu),以便隨后可以對解碼后的數(shù)據(jù)進(jìn)行內(nèi)存內(nèi)分析。Arrow列格式具有一些不錯(cuò)的屬性:隨機(jī)訪問為O(1),每個(gè)值單元格在內(nèi)存中的前一個(gè)和后一個(gè)相鄰,因此進(jìn)行迭代非常有效。
Apache Arrow定義了一種二進(jìn)制“序列化”協(xié)議,用于安排Arrow列數(shù)組的集合(稱為“記錄批處理”),該數(shù)組可用于消息傳遞和進(jìn)程間通信。您可以將協(xié)議放在任何地方,包括磁盤上,以后可以對其進(jìn)行內(nèi)存映射或讀入內(nèi)存并發(fā)送到其他地方。
Arrow協(xié)議的設(shè)計(jì)目的是使您可以“映射”一個(gè)Arrow數(shù)據(jù)塊而不進(jìn)行任何反序列化,因此對磁盤上的Arrow協(xié)議數(shù)據(jù)執(zhí)行分析可以使用內(nèi)存映射并有效地支付零成本。該協(xié)議用于很多事情,例如Spark SQL和Python之間的流數(shù)據(jù),用于針對Spark SQL數(shù)據(jù)塊運(yùn)行pandas函數(shù),這些被稱為“ pandas udfs”。
Arrow是為內(nèi)存而設(shè)計(jì)的(但是您可以將其放在磁盤上,然后再進(jìn)行內(nèi)存映射)。它們旨在相互兼容,并在應(yīng)用程序中一起使用,而其競爭對手Apache Parquet文件是為磁盤存儲而設(shè)計(jì)的。
優(yōu)點(diǎn):Apache Arrow為平面和分層數(shù)據(jù)定義了一種獨(dú)立于語言的列式存儲格式,該格式組織為在CPU和GPU等現(xiàn)代硬件上進(jìn)行高效的分析操作而組織。Arrow存儲器格式還支持零拷貝讀取,以實(shí)現(xiàn)閃電般的數(shù)據(jù)訪問,而無需序列化開銷。
Java的Apache Arrow
導(dǎo)入庫:
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
在開始之前,必須了解對于Arrow的讀/寫操作,使用了字節(jié)緩沖區(qū)。諸如讀取和寫入之類的操作是字節(jié)的連續(xù)交換。為了提高效率,Arrow附帶了一個(gè)緩沖區(qū)分配器,該緩沖區(qū)分配器可以具有一定的大小,也可以具有自動擴(kuò)展功能。支持分配管理的庫是arrow-memory-netty和arrow-memory-unsafe。我們這里使用netty。
用Arrow存儲數(shù)據(jù)需要一個(gè)模式,模式可以通過編程定義:
package com.gkatzioura.arrow;
import java.io.IOException;
import java.util.List;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
public class SchemaFactory {
public static Schema DEFAULT_SCHEMA = createDefault();
public static Schema createDefault() {
var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);
var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);
return new Schema(List.of(strField, intField));
}
public static Schema schemaWithChildren() {
var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));
return new Schema(List.of(itemField));
}
public static Schema fromJson(String jsonString) {
try {
return Schema.fromJSON(jsonString);
} catch (IOException e) {
throw new ArrowExampleException(e);
}
}
}
他們也有一個(gè)可解析的json表示形式:
{
"fields" : [ {
"name" : "col1",
"nullable" : true,
"type" : {
"name" : "utf8"
},
"children" : [ ]
}, {
"name" : "col2",
"nullable" : true,
"type" : {
"name" : "int",
"bitWidth" : 32,
"isSigned" : true
},
"children" : [ ]
} ]
}
另外,就像Avro一樣,您可以在字段上設(shè)計(jì)復(fù)雜的架構(gòu)和嵌入式值:
public static Schema schemaWithChildren() {
var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));
return new Schema(List.of(itemField));
}
基于上面的的Schema,我們將為我們的類創(chuàng)建一個(gè)DTO:
package com.gkatzioura.arrow;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class DefaultArrowEntry {
private String col1;
private Integer col2;
}
我們的目標(biāo)是將這些Java對象轉(zhuǎn)換為Arrow字節(jié)流。
1. 使用分配器創(chuàng)建 DirectByteBuffer
這些緩沖區(qū)是 堆外的 。您確實(shí)需要釋放所使用的內(nèi)存,但是對于庫用戶而言,這是通過在分配器上執(zhí)行 close() 操作來完成的。在我們的例子中,我們的類將實(shí)現(xiàn) Closeable 接口,該接口將執(zhí)行分配器關(guān)閉操作。
通過使用流api,數(shù)據(jù)將被流傳輸?shù)绞褂肁rrow格式提交的OutPutStream:
package com.gkatzioura.arrow;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;
import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;
public class DefaultEntriesWriter implements Closeable {
private final RootAllocator rootAllocator;
private final VectorSchemaRoot vectorSchemaRoot;//向量分配器創(chuàng)建:
public DefaultEntriesWriter() {
rootAllocator = new RootAllocator();
vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
}
public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
if (batchSize <= 0) {
batchSize = defaultArrowEntries.size();
}
DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
writer.start();
VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
childVector1.reset();
childVector2.reset();
boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
int batchCounter = 0;
for(int i=0; i < defaultArrowEntries.size(); i++) {
childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());
batchCounter++;
if(batchCounter == batchSize) {
vectorSchemaRoot.setRowCount(batchSize);
writer.writeBatch();
batchCounter = 0;
}
}
if(!exactBatches) {
vectorSchemaRoot.setRowCount(batchCounter);
writer.writeBatch();
}
writer.end();
} catch (IOException e) {
throw new ArrowExampleException(e);
}
}
@Override
public void close() throws IOException {
vectorSchemaRoot.close();
rootAllocator.close();
}
}
為了在Arrow上顯示批處理的支持,已在函數(shù)中實(shí)現(xiàn)了簡單的批處理算法。對于我們的示例,只需考慮將數(shù)據(jù)分批寫入。
讓我們深入了解上面代碼功能:
向量分配器創(chuàng)建:
public DefaultEntriesToBytesConverter() {
rootAllocator = new RootAllocator();
vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
}
然后在寫入流時(shí),實(shí)現(xiàn)并啟動了Arrow流編寫器
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();
我們將數(shù)據(jù)填充向量,然后還重置它們,但讓預(yù)分配的緩沖區(qū) 存在 :
VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
childVector1.reset();
childVector2.reset();
寫入數(shù)據(jù)時(shí),我們使用 setSafe 操作。如果需要分配更多的緩沖區(qū),應(yīng)采用這種方式。對于此示例,此操作在每次寫入時(shí)都完成,但是在考慮了所需的操作和緩沖區(qū)大小后可以避免:
childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());
然后,將批處理寫入流中:
vectorSchemaRoot.setRowCount(batchSize);
writer.writeBatch();
最后但并非最不重要的一點(diǎn)是,我們關(guān)閉了writer:
@Override
public void close() throws IOException {
vectorSchemaRoot.close();
rootAllocator.close();
}
以上就是JVM上高性能數(shù)據(jù)格式庫包Apache Arrow入門和架構(gòu)詳解(Gkatziouras)的詳細(xì)內(nèi)容,更多關(guān)于Apache Arrow入門的資料請關(guān)注腳本之家其它相關(guān)文章!