Services
The rac-delta SDK has 4 services that will be responsible of all operations in the pipelines, and can be used individually for custom operations or customization of pipelines.
Hasher service
The hasher service is responsible of implementing hashing to create file and chunk hashes, to verify integrity and to generate FileEntry and Chunk objects with its hashes.
Our NodeJS version uses HashWasm with blake3 for this, and the Rust version uses blake3 crate.
It has stream support using streamHash method, that will generate a Chunk object. You can use it to generate the chunks array of a file, but delta service already implements a method for stream that uses this.
- Node.js
- Rust
Example usage of hasher service:
// We hash a file and returns a fileEntry, that contains meta data and a list of its chunks (and chunk hashes)
const fileEntry = await racDeltaClient.hasher.hashFile('my-dir/file.txt', 'my-dir', 1024 * 1024);
Example usage of hasher service:
// We hash a file and returns a fileEntry, that contains meta data and a list of its chunks (and chunk hashes)
let file_entry = client.hasher.hash_file("dir/file.txt", "dir", 1024 * 1024).await?;
Validation service
The validation service is used only for validation of files and rd-index.json files. It uses the hasher service for this, has basic methods that returns boolean.
- Node.js
- Rust
Example usage of validation service:
// We generate a FileEntry or we can get it from our rd-index.json
const fileEntry = await racDeltaClient.hasher.hashFile('my-dir/file.txt', 'my-dir', 1024 * 1024);
// Validates the file entry hash with the actual file hash
const valid = await racDeltaClient.validation.validateFile(fileEntry, 'my-dir/file.txt');
Example usage of validation service:
// We generate a FileEntry or we can get it from our rd-index.json
let file_entry = client.hasher.hash_file("dir/file.txt", "dir", 1024 * 1024).await?;
// Validates the file entry hash with the actual file hash
let valid = client.validation.validate_file(&file_entry, "dir/file.txt").await?;
Delta service
The delta service is responsible of mainly creating rd-indexes, comparing rd-indexes or merging rd-indexes.
Comparing two rd-index files will generate a Delta Plan that will include the changes to upload or download.
- Node.js
- Rust
Example usage of delta service:
// Generate a rd-index object from directory, (path, chunkSize, concurrency, ignore patterns)
const index = await racDeltaClient.delta.createIndexFromDirectory('my-dir', 1024 * 1024, 6, ['*.zip']);
Example usage of delta service:
// Generate a rd-index object from directory, (path, chunkSize, concurrency, ignore patterns)
let index = client.delta.create_index_from_directory(Path::new("my-dir"), 1024 * 1024, Some(6), Some(vec![String::from("*.zip")])).await?;
Streaming
Delta service offers a way to generate file entries via streaming, perfect for use cases where memory is low.
- Node.js
- Rust
Generating a rd-index via streaming:
//
// Adapter: Readable → AsyncChunkStream
//
class ReadableChunkStream implements AsyncChunkStream {
private reader: AsyncIterable<Uint8Array>;
constructor(path: string, chunkSize: number) {
this.reader = createReadStream(path, {
highWaterMark: chunkSize,
});
}
async *[Symbol.asyncIterator]() {
for await (const chunk of this.reader) {
yield new Uint8Array(chunk);
}
}
async nextChunk(): Promise<Uint8Array | null> {
const iterator = this[Symbol.asyncIterator]();
const result = await iterator.next();
return result.done ? null : result.value;
}
}
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
const files = await readdir('my-dir');
const entries: FileEntry[] = [];
for (const name of files) {
const full = join('my-dir', name);
const stats = await stat(full);
if (!stats.isFile()) {
continue;
}
const stream = new ReadableChunkStream(full, 1024 * 1024);
const entry = await racDeltaClient.delta.createFileEntryFromStream(stream, full);
entries.push(entry);
}
const rdIndex: RDIndex = {
version: 1,
createdAt: Date.now(),
chunkSize: 1024 * 1024,
files: entries,
};
Generating a rd-index via streaming:
use async_trait::async_trait;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
pub struct ReadableChunkStream {
reader: BufReader<File>,
chunk_size: usize,
}
impl ReadableChunkStream {
pub async fn new(path: &str, chunk_size: usize) -> std::io::Result<Self> {
let file = File::open(path).await?;
Ok(Self {
reader: BufReader::new(file),
chunk_size,
})
}
}
#[async_trait]
impl AsyncChunkStream for ReadableChunkStream {
async fn next_chunk(&mut self) -> Option<Vec<u8>> {
let mut buffer = vec![0u8; self.chunk_size];
match self.reader.read(&mut buffer).await {
Ok(0) => None, // EOF
Ok(n) => {
buffer.truncate(n);
Some(buffer)
}
Err(_) => None,
}
}
}
pub async fn build_rdindex() -> Result<RDIndex, Box<dyn std::error::Error>> {
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client = RacDeltaClient::new(config).await?;
let mut entries: Vec<FileEntry> = Vec::new();
let dir = "my-dir";
let mut read_dir = tokio::fs::read_dir(dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
if !path.is_file() {
continue;
}
let full_path_str = path.to_string_lossy().to_string();
let mut stream = ReadableChunkStream::new(&full_path_str, 1024 * 1024).await?;
let file_entry = client
.delta
.create_file_entry_from_stream(&mut stream, &full_path_str)
.await?;
entries.push(file_entry);
}
let rd_index = RDIndex {
version: 1,
created_at: chrono::Utc::now().timestamp_millis() as u64,
chunk_size: 1024 * 1024,
files: entries,
};
Ok(rd_index)
}
Reconstruction service
The reconstruction service is a complex service that is responsible of reconstruct files via chunks.
It can reconstruct a single file or all files of a given delta plan.
For reconstruction, the service uses a ChunkSource, a special Class that connects directly with your storage adapter, your memory, or your disk, depending on how you want your chunks managed.
It uses different reconstruction strategies, depending on file size or chunk source. You can specify a file size threshold for in-place reconstruction or temp reconstruction.
- Node.js
- Rust
Example usage of reconstruction service:
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
// Create a Chunk Source using our current storage (SSH in this case)
const chunkSource = new StorageChunkSource(racDeltaClient.storage);
// Chunk sources brings a lot of possibilities, like reconstructing half of the files via storage and the other half via disk
// But in this case we reconstruct all the files via storage
await racDeltaClient.reconstruction.reconstructFile(
remoteIndex.files[0],
'output-dir/file.txt',
chunkSource,
{ inPlaceReconstructionThreshold: 0 }
);
Example usage of reconstruction service:
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client: RacDeltaClient = RacDeltaClient::new(config).await?;
let chunk_source: StorageChunkSource = StorageChunkSource::new(client.storage, None);
client
.reconstruction
.reconstruct_file(
remote_index.files[0],
Path::new("output-dir/file.txt"),
&chunk_source,
Some(&ReconstructionOptions {
in_place_reconstruction_threshold: Some(0),
..Default::default()
}),
None,
)
.await?;
For more, see ReconstructionService
Streaming
Reconstruction service also supports streaming reconstruction using the reconstructToStream method.
This will return a stream perfect for reconstruct the file to another source, like upload to third party storage or for compression.
- Node.js
- Rust
Example usage of reconstruction stream:
const racDeltaClient = await RacDeltaClient.create({
chunkSize: 1024 * 1024,
maxConcurrency: 6,
storage: {
type: 'ssh',
host: 'localhost',
pathPrefix: '/root/upload',
port: 2222,
credentials: {
username: 'root',
password: 'password',
},
},
});
// Create a Chunk Source using our current storage (SSH in this case)
const chunkSource = new StorageChunkSource(racDeltaClient.storage);
const stream = await racDeltaClient.reconstruction.reconstructToStream(
remoteIndex.files[0],
chunkSource
);
// Example stream upload to S3
await s3.send(
new PutObjectCommand({
Bucket: 'my-bucket',
Key: 'files/file.txt',
Body: stream,
})
);
Example usage of reconstruction stream:
let config = RacDeltaConfig {
chunk_size: 1024 * 1024,
max_concurrency: Some(6),
storage: StorageConfig::SSH(SSHStorageConfig {
base: BaseStorageConfig {
path_prefix: Some("/root/upload".to_string()),
},
host: "localhost".to_string(),
port: Some(2222),
credentials: SSHCredentials {
username: "root".to_string(),
password: Some("password".to_string()),
private_key: None,
},
}),
};
let client: RacDeltaClient = RacDeltaClient::new(config).await?;
let chunk_source: StorageChunkSource = StorageChunkSource::new(client.storage, None);
let stream: Pin<Box<dyn AsyncRead + Send + Sync>> = client
.reconstruction
.reconstruct_to_stream(remote_index.files[0], Arc::new(chunk_source))
.await?;
let byte_stream = ByteStream::from_reader(stream);
s3_client
.put_object()
.bucket("my-bucket")
.key("files/file.txt")
.body(byte_stream)
.send()
.await?;