Servicios
El SDK de rac-delta tiene 4 servicios que serán responsables de todas las operaciones de las pipelines, y pueden ser usados de forma individual para operaciones personalizadas o personalización de pipelines.
Servicio de hasher
El servicio de hasher se encarga de implementar el hashing para crear los hashes de los archivos y los chunks, para verificar la integridad y para generar los objetos FileEntry y Chunk con sus hashes.
Nuestra versión de NodeJs usa HashWasm con blake3 para esto, y la versión de Rust usa la crate blake3.
Tiene soporte para stream usando el método streamHash, que generará un objeto Chunk. Puedes usarlo para generar el array de chunks de un archivo, pero el servicio delta ya implementa un método de streaming que usa esto.
- Node.js
- Rust
Ejemplo de uso del servicio hasher:
// Hasheamos un archivo y retorna un fileEntry, que contiene meta datos y una lista de sus chunks (y sus hashes)
const fileEntry = await racDeltaClient.hasher.hashFile('my-dir/file.txt', 'my-dir', 1024 * 1024);
Ejemplo de uso del servicio hasher:
// Hasheamos un archivo y retorna un fileEntry, que contiene meta datos y una lista de sus chunks (y sus hashes)
let file_entry = client.hasher.hash_file("dir/file.txt", "dir", 1024 * 1024).await?;
Servicio de validación
El servicio de validación se usa únicamente para validar archivos e índices. Usa el servicio de hash para ello. Tiene métodos básicos que devuelven booleanos.
- Node.js
- Rust
Ejemplo de uso del servicio de validación:
// Generamos un FileEntry o lo sacamos de nuestro rd-index.json
const fileEntry = await racDeltaClient.hasher.hashFile('my-dir/file.txt', 'my-dir', 1024 * 1024);
// Valida el hash del fileEntry con el verdadero hash del archivo
const valid = await racDeltaClient.validation.validateFile(fileEntry, 'my-dir/file.txt');
Ejemplo de uso del servicio de validación:
// Generamos un FileEntry o lo sacamos de nuestro rd-index.json
let file_entry = client.hasher.hash_file("dir/file.txt", "dir", 1024 * 1024).await?;
// Valida el hash del fileEntry con el verdadero hash del archivo
let valid = client.validation.validate_file(&file_entry, "dir/file.txt").await?;
Servicio delta
El servicio delta se encarga principalmente de crear los rd-index, compararlos y mergearlos.
Comparar dos archivos rd-index generará un Delta Plan que incluirá los cambios a subir o descargar.
- Node.js
- Rust
Ejemplo de uso del servicio delta:
// Genera un objeto rd-index a partir de un directorio, (ruta, tamaño de chunk, concurrencia, patrones a ignorar)
const index = await racDeltaClient.delta.createIndexFromDirectory('my-dir', 1024 * 1024, 6, ['*.zip']);
Ejemplo de uso del servicio delta:
// Genera un objeto rd-index a partir de un directorio, (ruta, tamaño de chunk, concurrencia, patrones a ignorar)
let index = client.delta.create_index_from_directory(Path::new("my-dir"), 1024 * 1024, Some(6), Some(vec![String::from("*.zip")])).await?;
Streaming
El servicio delta ofrece una forma de generar file entries vía streaming, perfecto para casos de uso donde la memoria no abunda.
- Node.js
- Rust
Generar un rd-index vía streaming:
//
// Adapter: Readable → AsyncChunkStream
//
class ReadableChunkStream implements AsyncChunkStream {
private reader: AsyncIterable<Uint8Array>;
constructor(path: string, chunkSize: number) {
this.reader = createReadStream(path, {
highWaterMark: chunkSize,
});
}
async *[Symbol.asyncIterator]() {
for await (const chunk of this.reader) {
yield new Uint8Array(chunk);
}
}
async nextChunk(): Promise<Uint8Array | null> {
const iterator = this[Symbol.asyncIterator]();
const result = await iterator.next();
return result.done ? null : result.value;
}
}
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
const files = await readdir('my-dir');
const entries: FileEntry[] = [];
for (const name of files) {
const full = join('my-dir', name);
const stats = await stat(full);
if (!stats.isFile()) {
continue;
}
const stream = new ReadableChunkStream(full, 1024 * 1024);
const entry = await racDeltaClient.delta.createFileEntryFromStream(stream, full);
entries.push(entry);
}
const rdIndex: RDIndex = {
version: 1,
createdAt: Date.now(),
chunkSize: 1024 * 1024,
files: entries,
};
Generar un rd-index vía streaming:
use async_trait::async_trait;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
pub struct ReadableChunkStream {
reader: BufReader<File>,
chunk_size: usize,
}
impl ReadableChunkStream {
pub async fn new(path: &str, chunk_size: usize) -> std::io::Result<Self> {
let file = File::open(path).await?;
Ok(Self {
reader: BufReader::new(file),
chunk_size,
})
}
}
#[async_trait]
impl AsyncChunkStream for ReadableChunkStream {
async fn next_chunk(&mut self) -> Option<Vec<u8>> {
let mut buffer = vec![0u8; self.chunk_size];
match self.reader.read(&mut buffer).await {
Ok(0) => None, // EOF
Ok(n) => {
buffer.truncate(n);
Some(buffer)
}
Err(_) => None,
}
}
}
pub async fn build_rdindex() -> Result<RDIndex, Box<dyn std::error::Error>> {
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client = RacDeltaClient::new(config).await?;
let mut entries: Vec<FileEntry> = Vec::new();
let dir = "my-dir";
let mut read_dir = tokio::fs::read_dir(dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
if !path.is_file() {
continue;
}
let full_path_str = path.to_string_lossy().to_string();
let mut stream = ReadableChunkStream::new(&full_path_str, 1024 * 1024).await?;
let file_entry = client
.delta
.create_file_entry_from_stream(&mut stream, &full_path_str)
.await?;
entries.push(file_entry);
}
let rd_index = RDIndex {
version: 1,
created_at: chrono::Utc::now().timestamp_millis() as u64,
chunk_size: 1024 * 1024,
files: entries,
};
Ok(rd_index)
}
Servicio de reconstrucción
El servicio de reconstrucción es un servicio complejo que se encarga de reconstruir archivos a partir de chunks.
Puede reconstruir un solo archivo o todos los archivos de un plan delta.
Para la reconstrucción, el servicio utiliza un ChunkSource, una clase especial que conecta directamente con tu adaptador de almacenamiento, tu memoria o tu disco, dependiendo de cómo quieras manejar los chunks.
Usa distintas estrategias de reconstrucción, dependiendo en el tamaño de archivo o del chunk source. Puedes especificar un umbral de tamaño de archivo para reconstruir in-place o en temporal.
- Node.js
- Rust
Ejemplo de uso del servicio de reconstrucción:
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
// Crea un Chunk Source usando nuestro almacenamiento actual (SSH en este caso)
const chunkSource = new StorageChunkSource(racDeltaClient.storage);
// Los chunk sources ofrecen un montón de posibilidades, como reconstruir la mitad de los archivos vía almacenamiento y la otra mitad vía disco
// Pero en este caso lo reconstruiremos todo vía almacenamiento
await racDeltaClient.reconstruction.reconstructFile(
remoteIndex.files[0],
'output-dir/file.txt',
chunkSource,
{ inPlaceReconstructionThreshold: 0 } // No usamos in-place
);
Ejemplo de uso del servicio de reconstrucción:
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client: RacDeltaClient = RacDeltaClient::new(config).await?;
let chunk_source: StorageChunkSource = StorageChunkSource::new(client.storage, None);
client
.reconstruction
.reconstruct_file(
remote_index.files[0],
Path::new("output-dir/file.txt"),
&chunk_source,
Some(&ReconstructionOptions {
in_place_reconstruction_threshold: Some(0),
..Default::default()
}),
None,
)
.await?;
Para más, echa un ojo a ReconstructionService
Streaming
El servicio de reconstrucción también tiene soporte para reconstrucción en streaming usando el método reconstructToStream.
Esto devolverá un stream perfecto para reconstruir archivos hacia otra fuente, como subir a almacenamiento third party o para compresión.
- Node.js
- Rust
Ejemplo de uso de stream de reconstrucción:
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
// Crea un Chunk Source usando nuestro almacenamiento actual (SSH en este caso)
const chunkSource = new StorageChunkSource(racDeltaClient.storage);
const stream = await racDeltaClient.reconstruction.reconstructToStream(
remoteIndex.files[0],
chunkSource
);
// Subida de stream de ejemplo a S3
await s3.send(
new PutObjectCommand({
Bucket: 'my-bucket',
Key: 'files/file.txt',
Body: stream,
})
);
Ejemplo de uso de stream de reconstrucción:
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client: RacDeltaClient = RacDeltaClient::new(config).await?;
let chunk_source: StorageChunkSource = StorageChunkSource::new(client.storage, None);
let stream: Pin<Box<dyn AsyncRead + Send + Sync>> = client
.reconstruction
.reconstruct_to_stream(remote_index.files[0], Arc::new(chunk_source))
.await?;
let byte_stream = ByteStream::from_reader(stream);
s3_client
.put_object()
.bucket("my-bucket")
.key("files/file.txt")
.body(byte_stream)
.send()
.await?;