09c30f97e9
Consolidated duplicated helpers into Elysium.Common.ps1: - Settings parsing (Read-KeyValueSettingsFile, Read-ElysiumSettings, Get-SettingsValue) - Azure Blob URI builder (Build-BlobUri) - S3 SigV4 signing helpers and AWS module bootstrap - AD credential validation and replication permission pre-check - Parallel execution helper (Get-FunctionDefinitionText) Test-WeakADPasswords.ps1 and Extract-NTHashes.ps1 now import Elysium.Common.ps1 for the first time. Update-KHDB.ps1 and Prepare-KHDBStorage.ps1 removed their local duplicates. Deleted legacy Settings.ps1 (superseded by ElysiumSettings.txt). Removed stray placeholder comment in Elysium.ps1. All versions bumped to unified v2.2.1.
1161 lines
51 KiB
PowerShell
1161 lines
51 KiB
PowerShell
##################################################
|
|
## ____ ___ ____ _____ _ _ _____ _____ ##
|
|
## / ___/ _ \| _ \| ____| | \ | | ____|_ _| ##
|
|
## | | | | | | |_) | _| | \| | _| | | ##
|
|
## | |__| |_| | _ <| |___ _| |\ | |___ | | ##
|
|
## \____\__\_\_| \_\_____(_)_| \_|_____| |_| ##
|
|
##################################################
|
|
## Project: Elysium ##
|
|
## File: Prepare-KHDBStorage.ps1 ##
|
|
## Version: 2.2.1 ##
|
|
## Support: support@cqre.net ##
|
|
##################################################
|
|
|
|
<#
|
|
.SYNOPSIS
|
|
Prepares sharded KHDB content for remote storage.
|
|
|
|
.DESCRIPTION
|
|
Splits a monolithic khdb.txt into two-hex prefix shards, generates a manifest
|
|
compatible with Update-KHDB.ps1, and optionally uploads both manifest and shards
|
|
to Azure Blob Storage or an S3-compatible bucket.
|
|
#>
|
|
|
|
[CmdletBinding()]
|
|
param(
|
|
[Parameter(Mandatory = $true)]
|
|
[ValidateNotNullOrEmpty()]
|
|
[string]$SourcePath,
|
|
|
|
[string]$OutputRoot,
|
|
|
|
[ValidateRange(1, 8)]
|
|
[int]$ShardSize = 2,
|
|
|
|
[string]$ManifestVersion,
|
|
|
|
[ValidateSet('None', 'Azure', 'S3')]
|
|
[string]$StorageProvider = 'None',
|
|
|
|
# Azure options
|
|
[string]$StorageAccountName,
|
|
[string]$ContainerName,
|
|
[string]$SasToken,
|
|
|
|
# S3-compatible options
|
|
[string]$S3EndpointUrl,
|
|
[string]$S3Region = 'us-east-1',
|
|
[string]$S3BucketName,
|
|
[string]$S3AccessKeyId,
|
|
[string]$S3SecretAccessKey,
|
|
[bool]$S3ForcePathStyle = $true,
|
|
|
|
# Remote layout
|
|
[string]$ManifestRemotePath = 'khdb/manifest.json',
|
|
[string]$ShardRemotePrefix = 'khdb/shards',
|
|
|
|
[switch]$SkipUpload,
|
|
[switch]$UploadOnly,
|
|
|
|
[switch]$ShowProgress,
|
|
[int]$ProgressUpdateInterval = 100000,
|
|
|
|
[ValidateRange(1, 64)]
|
|
[int]$MaxParallelTransfers = 5,
|
|
|
|
[switch]$ForcePlainText,
|
|
|
|
[string]$CheckpointPath,
|
|
[switch]$NoCheckpoint,
|
|
|
|
[switch]$Force,
|
|
|
|
[string]$SettingsPath
|
|
)
|
|
|
|
$ErrorActionPreference = 'Stop'
|
|
Set-StrictMode -Version Latest
|
|
[string]$commonHelper = Join-Path -Path $PSScriptRoot -ChildPath 'Elysium.Common.ps1'
|
|
if (-not (Test-Path -LiteralPath $commonHelper)) { throw "Common helper not found at $commonHelper" }
|
|
. $commonHelper
|
|
Restart-WithPwshIfAvailable -BoundParameters $PSBoundParameters -UnboundArguments $MyInvocation.UnboundArguments
|
|
|
|
[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor [System.Net.SecurityProtocolType]::Tls12
|
|
Add-Type -AssemblyName System.IO.Compression.FileSystem -ErrorAction SilentlyContinue
|
|
Add-Type -AssemblyName System.Net.Http -ErrorAction SilentlyContinue
|
|
|
|
function Ensure-Directory {
|
|
param([string]$Path)
|
|
if ([string]::IsNullOrWhiteSpace($Path)) { return }
|
|
if (-not (Test-Path -LiteralPath $Path)) {
|
|
New-Item -Path $Path -ItemType Directory -Force | Out-Null
|
|
}
|
|
}
|
|
|
|
function Remove-DirectoryContents {
|
|
param([string]$Path)
|
|
if (-not (Test-Path -LiteralPath $Path)) { return }
|
|
Get-ChildItem -LiteralPath $Path -Force | ForEach-Object {
|
|
Remove-Item -LiteralPath $_.FullName -Recurse -Force
|
|
}
|
|
}
|
|
|
|
function Merge-ShardsToFile {
|
|
param(
|
|
[psobject]$Manifest,
|
|
[string]$ShardsRoot,
|
|
[string]$TargetPath
|
|
)
|
|
|
|
$encoding = New-Object System.Text.UTF8Encoding($false)
|
|
$writer = New-Object System.IO.StreamWriter($TargetPath, $false, $encoding, 1048576)
|
|
try {
|
|
foreach ($entry in ($Manifest.shards | Sort-Object name)) {
|
|
$relative = [string]$entry.name
|
|
if ([string]::IsNullOrWhiteSpace($relative)) { continue }
|
|
$shardPath = Join-Path -Path $ShardsRoot -ChildPath $relative
|
|
if (-not (Test-Path -LiteralPath $shardPath)) {
|
|
throw "Missing shard on disk: $relative"
|
|
}
|
|
$reader = New-Object System.IO.StreamReader($shardPath, [System.Text.Encoding]::UTF8, $true, 1048576)
|
|
try {
|
|
while (($line = $reader.ReadLine()) -ne $null) {
|
|
$trimmed = $line.Trim()
|
|
if ($trimmed.Length -gt 0) { $writer.WriteLine($trimmed) }
|
|
}
|
|
} finally {
|
|
$reader.Dispose()
|
|
}
|
|
}
|
|
} finally {
|
|
$writer.Dispose()
|
|
}
|
|
}
|
|
|
|
function Get-NormalizedForwardPath {
|
|
param([string]$PathValue)
|
|
if ([string]::IsNullOrWhiteSpace($PathValue)) { return '' }
|
|
return $PathValue.Replace('\', '/').Trim('/')
|
|
}
|
|
|
|
function Upload-AzureBlob {
|
|
param(
|
|
[string]$Account,
|
|
[string]$Container,
|
|
[string]$Sas,
|
|
[string]$BlobName,
|
|
[string]$FilePath,
|
|
[string]$ContentType
|
|
)
|
|
|
|
$uri = Build-BlobUri -Account $Account -Container $Container -Sas $Sas -BlobName $BlobName
|
|
$request = $null
|
|
$stream = $null
|
|
$client = [System.Net.Http.HttpClient]::new()
|
|
try {
|
|
$request = [System.Net.Http.HttpRequestMessage]::new([System.Net.Http.HttpMethod]::Put, $uri)
|
|
$stream = [System.IO.File]::OpenRead($FilePath)
|
|
$content = New-Object System.Net.Http.StreamContent($stream)
|
|
if ([string]::IsNullOrWhiteSpace($ContentType)) {
|
|
$ContentType = 'application/octet-stream'
|
|
}
|
|
$content.Headers.ContentType = [System.Net.Http.Headers.MediaTypeHeaderValue]::Parse($ContentType)
|
|
$request.Content = $content
|
|
$request.Headers.TryAddWithoutValidation('x-ms-blob-type', 'BlockBlob') | Out-Null
|
|
$request.Headers.TryAddWithoutValidation('x-ms-version', '2020-10-02') | Out-Null
|
|
|
|
$response = $client.SendAsync($request).GetAwaiter().GetResult()
|
|
$null = $response.EnsureSuccessStatusCode()
|
|
} finally {
|
|
if ($stream) { $stream.Dispose() }
|
|
if ($request) { $request.Dispose() }
|
|
if ($client) { $client.Dispose() }
|
|
}
|
|
}
|
|
|
|
function Invoke-S3HttpUpload {
|
|
param(
|
|
[string]$EndpointUrl,
|
|
[string]$Bucket,
|
|
[string]$Key,
|
|
[string]$FilePath,
|
|
[string]$Region,
|
|
[string]$AccessKeyId,
|
|
[string]$SecretAccessKey,
|
|
[bool]$ForcePathStyle,
|
|
[string]$PayloadHash,
|
|
[string]$ContentType
|
|
)
|
|
|
|
$uri = BuildS3Uri -endpointUrl $EndpointUrl -bucket $Bucket -key $Key -forcePathStyle $ForcePathStyle
|
|
$headers = BuildAuthHeaders -method 'PUT' -uri $uri -region $Region -accessKey $AccessKeyId -secretKey $SecretAccessKey -payloadHash $PayloadHash
|
|
|
|
$client = [System.Net.Http.HttpClient]::new()
|
|
$request = $null
|
|
$stream = $null
|
|
try {
|
|
$request = [System.Net.Http.HttpRequestMessage]::new([System.Net.Http.HttpMethod]::Put, $uri)
|
|
foreach ($kvp in $headers.GetEnumerator()) {
|
|
$request.Headers.TryAddWithoutValidation($kvp.Key, $kvp.Value) | Out-Null
|
|
}
|
|
$stream = [System.IO.File]::OpenRead($FilePath)
|
|
$content = New-Object System.Net.Http.StreamContent($stream)
|
|
if ([string]::IsNullOrWhiteSpace($ContentType)) {
|
|
$ContentType = 'application/octet-stream'
|
|
}
|
|
$content.Headers.ContentType = [System.Net.Http.Headers.MediaTypeHeaderValue]::Parse($ContentType)
|
|
$request.Content = $content
|
|
|
|
$response = $client.SendAsync($request, [System.Net.Http.HttpCompletionOption]::ResponseHeadersRead).GetAwaiter().GetResult()
|
|
$null = $response.EnsureSuccessStatusCode()
|
|
} finally {
|
|
if ($stream) { $stream.Dispose() }
|
|
if ($request) { $request.Dispose() }
|
|
if ($client) { $client.Dispose() }
|
|
}
|
|
}
|
|
|
|
function Split-KhdbIntoShards {
|
|
param(
|
|
[string]$Source,
|
|
[string]$ShardRoot,
|
|
[int]$PrefixLength,
|
|
[string]$InvalidOutputPath,
|
|
[switch]$ShowProgress,
|
|
[int]$ProgressUpdateInterval = 100000,
|
|
[string]$ProgressActivity = 'Splitting KHDB',
|
|
[switch]$ForcePlainText,
|
|
[switch]$EnableCheckpoint,
|
|
[string]$CheckpointPath,
|
|
[switch]$Resume,
|
|
[psobject]$ResumeState
|
|
)
|
|
|
|
$hashRegex = '^[0-9A-Fa-f]{32}(:\d+)?$'
|
|
$encoding = New-Object System.Text.UTF8Encoding($false)
|
|
$ShardRoot = [System.IO.Path]::GetFullPath($ShardRoot)
|
|
Ensure-Directory $ShardRoot
|
|
|
|
if ($Resume -and -not $EnableCheckpoint) { throw 'Resume requested without a checkpoint.' }
|
|
if ($EnableCheckpoint -and -not $ForcePlainText) { throw 'Checkpointing requires -ForcePlainText so the source is processed as plain text hashes.' }
|
|
|
|
$shardStates = @{}
|
|
$stats = @{}
|
|
$total = 0L
|
|
$resumeFilePosition = 0L
|
|
|
|
$meta = @{
|
|
TotalLines = 0L
|
|
InvalidLines = 0L
|
|
SkippedLines = 0L
|
|
LegacyLines = 0L
|
|
InvalidSamples = New-Object System.Collections.Generic.List[string]
|
|
}
|
|
|
|
if ($Resume -and $ResumeState) {
|
|
if ($ResumeState.sourcePath -and ($ResumeState.sourcePath -ne $Source)) {
|
|
throw "Checkpoint source '$($ResumeState.sourcePath)' does not match current source '$Source'."
|
|
}
|
|
if ($ResumeState.prefixLength -and ([int]$ResumeState.prefixLength -ne $PrefixLength)) {
|
|
throw "Checkpoint prefix length $($ResumeState.prefixLength) does not match requested $PrefixLength."
|
|
}
|
|
if ($ResumeState.forcePlainText -ne $true) {
|
|
throw 'Checkpoint was created without -ForcePlainText; resume is not supported in that mode.'
|
|
}
|
|
if ($ResumeState.shardRoot) {
|
|
$resumeShardRoot = [System.IO.Path]::GetFullPath([string]$ResumeState.shardRoot)
|
|
if ($resumeShardRoot -ne $ShardRoot) {
|
|
throw "Checkpoint shard root '$resumeShardRoot' does not match target '$ShardRoot'."
|
|
}
|
|
}
|
|
if ($ResumeState.totalLines) { $meta.TotalLines = [long]$ResumeState.totalLines }
|
|
if ($ResumeState.invalidLines) { $meta.InvalidLines = [long]$ResumeState.invalidLines }
|
|
if ($ResumeState.skippedLines) { $meta.SkippedLines = [long]$ResumeState.skippedLines }
|
|
if ($ResumeState.validEntries) { $total = [long]$ResumeState.validEntries }
|
|
if ($ResumeState.filePosition) { $resumeFilePosition = [long]$ResumeState.filePosition }
|
|
}
|
|
$plainReader = $null
|
|
$plainBaseStream = $null
|
|
|
|
# Pre-create all shard writers for the full prefix space (e.g., 256 for 2 hex digits)
|
|
$prefixList = @()
|
|
$prefixChars = '0123456789abcdef'
|
|
function Get-AllPrefixes([int]$length) {
|
|
if ($length -le 0) { return @('') }
|
|
$subs = Get-AllPrefixes ($length - 1)
|
|
$result = @()
|
|
foreach ($c in $prefixChars.ToCharArray()) {
|
|
foreach ($s in $subs) {
|
|
$result += ($s + $c)
|
|
}
|
|
}
|
|
return $result
|
|
}
|
|
$prefixList = Get-AllPrefixes $PrefixLength
|
|
foreach ($prefix in $prefixList) {
|
|
$shardPath = Join-Path -Path $ShardRoot -ChildPath ("$prefix.txt")
|
|
Ensure-Directory (Split-Path -Path $shardPath -Parent)
|
|
# Open with large buffer (1 MiB)
|
|
$writer = New-Object System.IO.StreamWriter($shardPath, $false, $encoding, 1048576)
|
|
$state = [ordered]@{
|
|
Writer = $writer
|
|
Path = $shardPath
|
|
Count = 0
|
|
PendingLine = $null
|
|
PendingHash = $null
|
|
PendingCount = -1
|
|
}
|
|
$shardStates[$prefix] = $state
|
|
}
|
|
|
|
$sourceItem = Get-Item -LiteralPath $Source -ErrorAction Stop
|
|
if ($ForcePlainText -and $sourceItem.PSIsContainer) {
|
|
throw 'ForcePlainText can only be used when SourcePath is a file. Provide a plain khdb.txt file, not a directory.'
|
|
}
|
|
if ($EnableCheckpoint -and $sourceItem.PSIsContainer) {
|
|
throw 'Checkpointing/resume is only supported when SourcePath points to a plain hash file.'
|
|
}
|
|
$sourceBaseDir = if ($sourceItem.PSIsContainer) { $sourceItem.FullName } else { Split-Path -Parent $sourceItem.FullName }
|
|
$currentSource = if ($sourceItem.PSIsContainer) { $sourceItem.FullName } else { $sourceItem.Name }
|
|
|
|
$maxInvalidSamples = 10
|
|
[System.IO.StreamWriter]$invalidWriter = $null
|
|
$ensureInvalidWriter = {
|
|
if (-not $InvalidOutputPath) { return }
|
|
if (-not $invalidWriter) {
|
|
$parent = Split-Path -Parent $InvalidOutputPath
|
|
if ($parent) { Ensure-Directory $parent }
|
|
$appendInvalid = $Resume -and (Test-Path -LiteralPath $InvalidOutputPath)
|
|
$invalidWriter = New-Object System.IO.StreamWriter($InvalidOutputPath, $appendInvalid, $encoding, 1048576)
|
|
}
|
|
}
|
|
|
|
if ($Resume -and $ResumeState -and $ResumeState.shardStates) {
|
|
foreach ($resumeShard in $ResumeState.shardStates) {
|
|
$prefix = [string]$resumeShard.prefix
|
|
if ([string]::IsNullOrWhiteSpace($prefix)) { continue }
|
|
$shardPath = Join-Path -Path $ShardRoot -ChildPath ("$prefix.txt")
|
|
Ensure-Directory (Split-Path -Path $shardPath -Parent)
|
|
$writer = New-Object System.IO.StreamWriter($shardPath, $true, $encoding, 1048576)
|
|
$state = [ordered]@{
|
|
Writer = $writer
|
|
Path = $shardPath
|
|
Count = if ($resumeShard.count -ne $null) { [long]$resumeShard.count } else { 0L }
|
|
PendingLine = $resumeShard.pendingLine
|
|
PendingHash = $resumeShard.pendingHash
|
|
PendingCount = if ($resumeShard.pendingCount -ne $null) { [int]$resumeShard.pendingCount } else { -1 }
|
|
}
|
|
$shardStates[$prefix] = $state
|
|
}
|
|
}
|
|
|
|
if ($ShowProgress) {
|
|
if ($ProgressUpdateInterval -lt 1) { $ProgressUpdateInterval = 100000 }
|
|
}
|
|
$progressStopwatch = if ($ShowProgress) { [System.Diagnostics.Stopwatch]::StartNew() } else { $null }
|
|
$checkpointEncoding = New-Object System.Text.UTF8Encoding($false)
|
|
$saveCheckpoint = {
|
|
param([long]$filePosition)
|
|
if (-not $EnableCheckpoint) { return }
|
|
foreach ($s in $shardStates.Values) { $s.Writer.Flush() }
|
|
if ($invalidWriter) { $invalidWriter.Flush() }
|
|
$payload = [ordered]@{
|
|
version = 1
|
|
savedAt = (Get-Date).ToUniversalTime().ToString('o')
|
|
sourcePath = $Source
|
|
shardRoot = $ShardRoot
|
|
forcePlainText = [bool]$ForcePlainText
|
|
prefixLength = $PrefixLength
|
|
mode = 'Plain'
|
|
filePosition = $filePosition
|
|
totalLines = [long]$meta.TotalLines
|
|
invalidLines = [long]$meta.InvalidLines
|
|
skippedLines = [long]$meta.SkippedLines
|
|
validEntries = [long]$total
|
|
shardStates = @()
|
|
}
|
|
foreach ($entry in ($shardStates.GetEnumerator() | Sort-Object Key)) {
|
|
$state = $entry.Value
|
|
$payload.shardStates += [ordered]@{
|
|
prefix = $entry.Key
|
|
count = [long]$state.Count
|
|
pendingLine = $state.PendingLine
|
|
pendingHash = $state.PendingHash
|
|
pendingCount = $state.PendingCount
|
|
}
|
|
}
|
|
[System.IO.File]::WriteAllText($CheckpointPath, ($payload | ConvertTo-Json -Depth 6), $checkpointEncoding)
|
|
}
|
|
$invokeProgress = {
|
|
param([bool]$Force = $false, [string]$Context)
|
|
if (-not $ShowProgress) { return }
|
|
if (-not $progressStopwatch) { return }
|
|
$shouldUpdate = $Force
|
|
if (-not $shouldUpdate) {
|
|
if ($ProgressUpdateInterval -gt 0 -and $meta.TotalLines -gt 0 -and ($meta.TotalLines % $ProgressUpdateInterval) -eq 0) {
|
|
$shouldUpdate = $true
|
|
} elseif ($progressStopwatch.ElapsedMilliseconds -ge 1000) {
|
|
$shouldUpdate = $true
|
|
}
|
|
}
|
|
if (-not $shouldUpdate) { return }
|
|
$statusContext = $Context
|
|
if ([string]::IsNullOrWhiteSpace($statusContext)) {
|
|
$statusContext = if ($currentSource) { Split-Path -Leaf $currentSource } else { 'input' }
|
|
}
|
|
$status = "Processed {0:N0} hashes (+{1:N0} invalid, {2:N0} skipped, {3:N0} lines) [{4}]" -f $total, $meta.InvalidLines, $meta.SkippedLines, $meta.TotalLines, $statusContext
|
|
Write-Progress -Activity $ProgressActivity -Status $status -PercentComplete 0
|
|
if ($EnableCheckpoint -and $plainReader) {
|
|
$checkpointPosition = if ($plainBaseStream) { $plainBaseStream.Position } else { $plainReader.BaseStream.Position }
|
|
& $saveCheckpoint $checkpointPosition
|
|
}
|
|
$progressStopwatch.Restart()
|
|
}
|
|
|
|
$processHashLine = {
|
|
param(
|
|
[string]$rawLine,
|
|
[string]$prefix
|
|
)
|
|
if ($null -eq $rawLine) { return }
|
|
$trimmed = $rawLine.Trim()
|
|
if ($trimmed.Length -eq 0) { return }
|
|
if ($trimmed.StartsWith('#')) { return }
|
|
if ($ForcePlainText -and $trimmed -match '(?i)\.gz(\s*)$') {
|
|
$meta.TotalLines++
|
|
$meta.SkippedLines++
|
|
return
|
|
}
|
|
|
|
# Fast path for valid 32-char hex lines
|
|
if ($rawLine.Length -eq 32 -and $rawLine -match '^[0-9A-Fa-f]{32}$') {
|
|
$prefixKey = $rawLine.Substring(0, $PrefixLength).ToLowerInvariant()
|
|
$shardStates[$prefixKey].Writer.WriteLine($rawLine.ToUpperInvariant())
|
|
$meta.TotalLines++
|
|
$total++
|
|
return
|
|
}
|
|
|
|
$meta.TotalLines++
|
|
|
|
$parts = $trimmed.Split(':', 2)
|
|
$hashPortion = $parts[0].Trim()
|
|
if (-not [string]::IsNullOrWhiteSpace($prefix)) {
|
|
$hashPortion = ($prefix.Trim() + $hashPortion)
|
|
}
|
|
if ($hashPortion.Length -ne 32 -or $hashPortion -notmatch '^[0-9A-Fa-f]{32}$') {
|
|
$match = [regex]::Match($hashPortion, '[0-9A-Fa-f]{32}')
|
|
if ($match.Success) {
|
|
$hashPortion = $match.Value
|
|
}
|
|
}
|
|
if ($hashPortion.Length -ne 32 -or $hashPortion -notmatch '^[0-9A-Fa-f]{32}$') {
|
|
$meta.InvalidLines++
|
|
if ($meta.InvalidSamples.Count -lt $maxInvalidSamples) {
|
|
[void]$meta.InvalidSamples.Add($trimmed)
|
|
}
|
|
& $ensureInvalidWriter
|
|
if ($invalidWriter) { $invalidWriter.WriteLine($trimmed) }
|
|
& $invokeProgress $false
|
|
return
|
|
}
|
|
if ($hashPortion.Length -lt $PrefixLength) {
|
|
$meta.InvalidLines++
|
|
if ($meta.InvalidSamples.Count -lt $maxInvalidSamples) {
|
|
[void]$meta.InvalidSamples.Add($trimmed)
|
|
}
|
|
& $ensureInvalidWriter
|
|
if ($invalidWriter) { $invalidWriter.WriteLine($trimmed) }
|
|
& $invokeProgress $false
|
|
return
|
|
}
|
|
$normalizedHash = $hashPortion.ToUpperInvariant()
|
|
$countValue = 0
|
|
if ($parts.Count -gt 1) {
|
|
$meta.LegacyLines++
|
|
$countText = $parts[1].Trim()
|
|
if (-not [string]::IsNullOrWhiteSpace($countText)) {
|
|
$null = [int]::TryParse($countText, [ref]$countValue)
|
|
if ($countValue -lt 0) { $countValue = 0 }
|
|
}
|
|
}
|
|
$normalizedLine = $normalizedHash
|
|
|
|
$prefixKey = $normalizedHash.Substring(0, $PrefixLength).ToLowerInvariant()
|
|
if (-not $shardStates.ContainsKey($prefixKey)) {
|
|
$shardPath = Join-Path -Path $ShardRoot -ChildPath ("$prefixKey.txt")
|
|
Ensure-Directory (Split-Path -Path $shardPath -Parent)
|
|
$appendExisting = $Resume -and (Test-Path -LiteralPath $shardPath)
|
|
$existingCount = 0L
|
|
if ($appendExisting) {
|
|
try {
|
|
$countReader = [System.IO.File]::OpenText($shardPath)
|
|
try {
|
|
while ($null -ne $countReader.ReadLine()) { $existingCount++ }
|
|
} finally { $countReader.Dispose() }
|
|
} catch { $existingCount = 0L }
|
|
}
|
|
$writer = New-Object System.IO.StreamWriter($shardPath, $appendExisting, $encoding, 1048576)
|
|
$state = [ordered]@{
|
|
Writer = $writer
|
|
Path = $shardPath
|
|
Count = $existingCount
|
|
PendingLine = $null
|
|
PendingHash = $null
|
|
PendingCount = -1
|
|
}
|
|
$shardStates[$prefixKey] = $state
|
|
}
|
|
$state = $shardStates[$prefixKey]
|
|
|
|
if ($state.PendingHash -and $state.PendingHash -eq $normalizedHash) {
|
|
if ($countValue -gt $state.PendingCount) {
|
|
$state.PendingLine = $normalizedLine
|
|
$state.PendingCount = $countValue
|
|
}
|
|
} else {
|
|
if ($state.PendingLine) {
|
|
$state.Writer.WriteLine($state.PendingLine)
|
|
$state.Count++
|
|
$total++
|
|
}
|
|
$state.PendingLine = $normalizedLine
|
|
$state.PendingHash = $normalizedHash
|
|
$state.PendingCount = $countValue
|
|
}
|
|
& $invokeProgress $false
|
|
}
|
|
|
|
$resolveGzipPath = {
|
|
param([string]$pathValue)
|
|
if ([string]::IsNullOrWhiteSpace($pathValue)) { return $null }
|
|
if (Test-Path -LiteralPath $pathValue) { return (Resolve-Path -LiteralPath $pathValue).ProviderPath }
|
|
if ([System.IO.Path]::IsPathRooted($pathValue)) { throw "Gzip file not found: $pathValue" }
|
|
$candidate = Join-Path -Path $sourceBaseDir -ChildPath $pathValue
|
|
if (Test-Path -LiteralPath $candidate) { return (Resolve-Path -LiteralPath $candidate).ProviderPath }
|
|
$meta.TotalLines++
|
|
$meta.InvalidLines++
|
|
if ($meta.InvalidSamples.Count -lt $maxInvalidSamples) {
|
|
[void]$meta.InvalidSamples.Add($pathValue)
|
|
}
|
|
& $ensureInvalidWriter
|
|
if ($invalidWriter) { $invalidWriter.WriteLine($pathValue) }
|
|
& $invokeProgress $true
|
|
return $null
|
|
}
|
|
|
|
$processGzipFile = {
|
|
param([string]$gzipPath)
|
|
if ($ForcePlainText) {
|
|
return
|
|
}
|
|
$resolved = & $resolveGzipPath $gzipPath
|
|
if (-not $resolved) { return }
|
|
if ($ShowProgress) { $currentSource = $gzipPath }
|
|
$filePrefix = [System.IO.Path]::GetFileNameWithoutExtension($resolved)
|
|
$fileStream = $null
|
|
$gzipStream = $null
|
|
$reader = $null
|
|
try {
|
|
$fileStream = [System.IO.File]::OpenRead($resolved)
|
|
$gzipStream = New-Object System.IO.Compression.GZipStream($fileStream, [System.IO.Compression.CompressionMode]::Decompress)
|
|
# Wrap in BufferedStream for larger read chunks
|
|
$bufferedStream = New-Object System.IO.BufferedStream($gzipStream, 1048576)
|
|
$reader = New-Object System.IO.StreamReader($bufferedStream, [System.Text.Encoding]::UTF8, $true, 1048576)
|
|
while (($line = $reader.ReadLine()) -ne $null) {
|
|
& $processHashLine $line $filePrefix
|
|
}
|
|
} finally {
|
|
if ($reader) { $reader.Dispose() }
|
|
if ($gzipStream) { $gzipStream.Dispose() }
|
|
if ($fileStream) { $fileStream.Dispose() }
|
|
}
|
|
& $invokeProgress $true $gzipPath
|
|
}
|
|
|
|
try {
|
|
if ($sourceItem.PSIsContainer) {
|
|
$gzipFiles = Get-ChildItem -LiteralPath $sourceItem.FullName -Filter '*.gz' -File -Recurse | Sort-Object FullName
|
|
if (-not $gzipFiles) { throw "Source directory '$($sourceItem.FullName)' does not contain any .gz files." }
|
|
foreach ($file in $gzipFiles) {
|
|
& $processGzipFile $file.FullName
|
|
}
|
|
} else {
|
|
if ($ShowProgress) { $currentSource = $sourceItem.FullName }
|
|
$mode = if ($ForcePlainText) { 'Plain' } else { $null }
|
|
# Use BufferedStream for larger read chunks
|
|
$fileStream = [System.IO.File]::Open($sourceItem.FullName, [System.IO.FileMode]::Open, [System.IO.FileAccess]::Read, [System.IO.FileShare]::Read)
|
|
if ($Resume -and $resumeFilePosition -gt 0) {
|
|
if ($fileStream.Length -lt $resumeFilePosition) {
|
|
throw "Checkpoint position $resumeFilePosition exceeds source length $($fileStream.Length)."
|
|
}
|
|
$fileStream.Seek($resumeFilePosition, [System.IO.SeekOrigin]::Begin) | Out-Null
|
|
}
|
|
$bufferedStream = New-Object System.IO.BufferedStream($fileStream, 1048576)
|
|
$reader = New-Object System.IO.StreamReader($bufferedStream, [System.Text.Encoding]::UTF8, $true, 1048576)
|
|
$plainReader = $reader
|
|
$plainBaseStream = $fileStream
|
|
try {
|
|
while (($line = $reader.ReadLine()) -ne $null) {
|
|
$trimmed = $line.Trim()
|
|
if ($trimmed.Length -eq 0) { continue }
|
|
if ($trimmed.StartsWith('#')) { continue }
|
|
if (-not $mode) {
|
|
if ($trimmed -like '*.gz') {
|
|
$resolvedProbe = & $resolveGzipPath $trimmed
|
|
if ($resolvedProbe) {
|
|
$mode = 'GzList'
|
|
if ($ShowProgress) { $currentSource = $trimmed }
|
|
} else {
|
|
continue
|
|
}
|
|
} else {
|
|
$mode = 'Plain'
|
|
}
|
|
}
|
|
|
|
if ($mode -eq 'Plain') {
|
|
& $processHashLine $line $null
|
|
} elseif (-not $ForcePlainText -and $trimmed -like '*.gz') {
|
|
& $processGzipFile $trimmed
|
|
} else {
|
|
if ($ForcePlainText -and $trimmed -match '(?i)\.gz(\s*)$') {
|
|
$meta.TotalLines++
|
|
$meta.SkippedLines++
|
|
} else {
|
|
$meta.TotalLines++
|
|
$meta.InvalidLines++
|
|
if ($meta.InvalidSamples.Count -lt $maxInvalidSamples) {
|
|
[void]$meta.InvalidSamples.Add($trimmed)
|
|
}
|
|
& $ensureInvalidWriter
|
|
if ($invalidWriter) { $invalidWriter.WriteLine($trimmed) }
|
|
}
|
|
& $invokeProgress $false
|
|
}
|
|
}
|
|
} finally {
|
|
if ($reader) { $reader.Dispose() }
|
|
if ($fileStream) { $fileStream.Dispose() }
|
|
$plainReader = $null
|
|
$plainBaseStream = $null
|
|
}
|
|
}
|
|
} finally {
|
|
if ($invalidWriter) { $invalidWriter.Dispose() }
|
|
}
|
|
|
|
foreach ($entry in ($shardStates.GetEnumerator() | Sort-Object Key)) {
|
|
$prefix = $entry.Key
|
|
$state = $entry.Value
|
|
if ($state.PendingLine) {
|
|
$state.Writer.WriteLine($state.PendingLine)
|
|
$state.Count++
|
|
$total++
|
|
$state.PendingLine = $null
|
|
}
|
|
$state.Writer.Dispose()
|
|
$stats[$prefix] = [ordered]@{
|
|
Path = $state.Path
|
|
Count = $state.Count
|
|
}
|
|
}
|
|
|
|
if ($total -eq 0) { throw 'Source did not contain any valid hashes after processing.' }
|
|
if ($ShowProgress) {
|
|
$status = "Processed {0:N0} hashes (+{1:N0} invalid, {2:N0} skipped, {3:N0} lines)" -f $total, $meta.InvalidLines, $meta.SkippedLines, $meta.TotalLines
|
|
Write-Progress -Activity $ProgressActivity -Status $status -Completed
|
|
}
|
|
return [pscustomobject]@{
|
|
TotalEntries = [long]$total
|
|
ShardStats = $stats
|
|
TotalLines = [long]$meta.TotalLines
|
|
InvalidLines = [long]$meta.InvalidLines
|
|
SkippedLines = [long]$meta.SkippedLines
|
|
LegacyCount = [long]$meta.LegacyLines
|
|
InvalidSamples = $meta.InvalidSamples.ToArray()
|
|
InvalidOutputPath = if ($meta.InvalidLines -gt 0 -and $InvalidOutputPath) { $InvalidOutputPath } else { $null }
|
|
}
|
|
}
|
|
|
|
function Write-JsonFile {
|
|
param(
|
|
[string]$Path,
|
|
[object]$Data
|
|
)
|
|
|
|
$json = $Data | ConvertTo-Json -Depth 6
|
|
$encoding = New-Object System.Text.UTF8Encoding($false)
|
|
[System.IO.File]::WriteAllText($Path, $json, $encoding)
|
|
}
|
|
|
|
$resolvedSettingsPath = $null
|
|
$elysiumSettings = $null
|
|
if ($SettingsPath) {
|
|
if (-not (Test-Path -LiteralPath $SettingsPath)) {
|
|
throw "Settings file not found at $SettingsPath"
|
|
}
|
|
$resolvedSettingsPath = (Resolve-Path -LiteralPath $SettingsPath).Path
|
|
} else {
|
|
$defaultSettingsCandidate = Join-Path -Path $PSScriptRoot -ChildPath 'ElysiumSettings.txt'
|
|
if (Test-Path -LiteralPath $defaultSettingsCandidate) {
|
|
$resolvedSettingsPath = (Resolve-Path -LiteralPath $defaultSettingsCandidate).Path
|
|
}
|
|
}
|
|
if ($resolvedSettingsPath) {
|
|
try {
|
|
$elysiumSettings = Read-KeyValueSettingsFile -Path $resolvedSettingsPath
|
|
} catch {
|
|
throw "Failed to parse settings file '$resolvedSettingsPath': $($_.Exception.Message)"
|
|
}
|
|
}
|
|
|
|
$psSupportsParallel = ($PSVersionTable.PSVersion.Major -ge 7)
|
|
$effectiveParallelTransfers = if ($MaxParallelTransfers -lt 1) { 1 } else { [int]$MaxParallelTransfers }
|
|
$parallelTransfersEnabled = $psSupportsParallel -and $effectiveParallelTransfers -gt 1
|
|
if (-not $psSupportsParallel -and $effectiveParallelTransfers -gt 1) {
|
|
Write-Verbose "Parallel transfers requested but PowerShell $($PSVersionTable.PSVersion) does not support ForEach-Object -Parallel; using serial mode."
|
|
}
|
|
$parallelAzureUploadHelpers = $null
|
|
$parallelAzureUploadHelperList = @()
|
|
$parallelS3UploadHelpers = $null
|
|
$parallelS3UploadHelperList = @()
|
|
if ($parallelTransfersEnabled) {
|
|
$parallelAzureUploadHelpers = @{
|
|
'Build-BlobUri' = Get-FunctionDefinitionText 'Build-BlobUri'
|
|
'Upload-AzureBlob' = Get-FunctionDefinitionText 'Upload-AzureBlob'
|
|
}
|
|
$parallelAzureUploadHelperList = $parallelAzureUploadHelpers.GetEnumerator() | ForEach-Object {
|
|
[pscustomobject]@{ Name = $_.Key; Definition = $_.Value }
|
|
}
|
|
$parallelS3UploadHelpers = @{}
|
|
@(
|
|
'Get-Bytes',
|
|
'Get-HashHex',
|
|
'HmacSha256',
|
|
'ToHex',
|
|
'GetSignatureKey',
|
|
'UriEncode',
|
|
'BuildCanonicalPath',
|
|
'BuildAuthHeaders',
|
|
'BuildS3Uri',
|
|
'Invoke-S3HttpUpload'
|
|
) | ForEach-Object {
|
|
$parallelS3UploadHelpers[$_] = Get-FunctionDefinitionText $_
|
|
}
|
|
$parallelS3UploadHelperList = $parallelS3UploadHelpers.GetEnumerator() | ForEach-Object {
|
|
[pscustomobject]@{ Name = $_.Key; Definition = $_.Value }
|
|
}
|
|
}
|
|
|
|
# Apply defaults from settings when caller did not specify overrides
|
|
if ($elysiumSettings) {
|
|
if (-not $PSBoundParameters.ContainsKey('StorageProvider')) {
|
|
$providerFromSettings = Get-SettingsValue -Settings $elysiumSettings -Key 'StorageProvider'
|
|
if ($providerFromSettings) { $StorageProvider = $providerFromSettings }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('ManifestVersion')) {
|
|
$manifestFromSettings = Get-SettingsValue -Settings $elysiumSettings -Key 'ManifestVersion'
|
|
if ($manifestFromSettings) { $ManifestVersion = $manifestFromSettings }
|
|
}
|
|
|
|
$providerUpper = if ($StorageProvider) { $StorageProvider.ToUpperInvariant() } else { 'NONE' }
|
|
if ($providerUpper -eq 'AZURE') {
|
|
if (-not $PSBoundParameters.ContainsKey('StorageAccountName')) {
|
|
$storageAccountSetting = Get-SettingsValue -Settings $elysiumSettings -Key 'storageAccountName'
|
|
if ($storageAccountSetting) { $StorageAccountName = $storageAccountSetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('ContainerName')) {
|
|
$containerSetting = Get-SettingsValue -Settings $elysiumSettings -Key 'containerName'
|
|
if ($containerSetting) { $ContainerName = $containerSetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('SasToken')) {
|
|
$sasSetting = Get-SettingsValue -Settings $elysiumSettings -Key 'sasToken'
|
|
if ($sasSetting) { $SasToken = $sasSetting }
|
|
}
|
|
} elseif ($providerUpper -eq 'S3') {
|
|
if (-not $PSBoundParameters.ContainsKey('S3EndpointUrl')) {
|
|
$endpointSetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3EndpointUrl'
|
|
if ($endpointSetting) { $S3EndpointUrl = $endpointSetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('S3BucketName')) {
|
|
$bucketSetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3BucketName'
|
|
if ($bucketSetting) { $S3BucketName = $bucketSetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('S3AccessKeyId')) {
|
|
$accessKeySetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3AccessKeyId'
|
|
if ($accessKeySetting) { $S3AccessKeyId = $accessKeySetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('S3SecretAccessKey')) {
|
|
$secretKeySetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3SecretAccessKey'
|
|
if ($secretKeySetting) { $S3SecretAccessKey = $secretKeySetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('S3Region')) {
|
|
$regionSetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3Region'
|
|
if ($regionSetting) { $S3Region = $regionSetting }
|
|
}
|
|
if (-not $PSBoundParameters.ContainsKey('S3ForcePathStyle')) {
|
|
$forcePathStyleSetting = Get-SettingsValue -Settings $elysiumSettings -Key 's3ForcePathStyle'
|
|
if ($forcePathStyleSetting) {
|
|
try { $S3ForcePathStyle = [System.Convert]::ToBoolean($forcePathStyleSetting) } catch {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# -- Argument validation ------------------------------------------------------
|
|
|
|
$resolvedSource = $null
|
|
if (-not $UploadOnly) {
|
|
$resolvedSource = Resolve-Path -LiteralPath $SourcePath -ErrorAction Stop
|
|
if (-not (Test-Path -LiteralPath $resolvedSource)) {
|
|
throw "Source file not found at $SourcePath"
|
|
}
|
|
}
|
|
|
|
if ($UploadOnly -and $SkipUpload) {
|
|
throw '-UploadOnly cannot be combined with -SkipUpload.'
|
|
}
|
|
if ($UploadOnly -and $StorageProvider -eq 'None') {
|
|
throw "-UploadOnly requires StorageProvider Azure or S3 so there is an upload to perform."
|
|
}
|
|
|
|
if ($UploadOnly) {
|
|
if (-not $OutputRoot) {
|
|
throw '-OutputRoot must be specified when using -UploadOnly.'
|
|
}
|
|
$resolvedOutputRoot = Resolve-Path -LiteralPath $OutputRoot -ErrorAction Stop
|
|
$OutputRoot = $resolvedOutputRoot.Path
|
|
} else {
|
|
if (-not $OutputRoot) {
|
|
$defaultRoot = Join-Path -Path (Split-Path -Parent $resolvedSource.Path) -ChildPath 'khdb-package'
|
|
$OutputRoot = $defaultRoot
|
|
}
|
|
}
|
|
|
|
$manifestObject = $null
|
|
$manifestShards = @()
|
|
$totalEntries = 0L
|
|
$totalLines = 0L
|
|
$invalidCount = 0L
|
|
$skippedCount = 0L
|
|
$totalSizeBytes = 0L
|
|
$summaryMessage = $null
|
|
$manifestHash = $null
|
|
$manifestPath = Join-Path -Path $OutputRoot -ChildPath 'manifest.json'
|
|
$localShardRoot = Join-Path -Path $OutputRoot -ChildPath 'shards'
|
|
$normalizedShardPrefix = $null
|
|
|
|
$checkpointEnabled = $false
|
|
$resume = $false
|
|
$resumeState = $null
|
|
$resolvedCheckpointPath = $null
|
|
|
|
if ($UploadOnly) {
|
|
if (-not (Test-Path -LiteralPath $localShardRoot)) {
|
|
throw "UploadOnly requested but shard directory '$localShardRoot' was not found."
|
|
}
|
|
if (-not (Test-Path -LiteralPath $manifestPath)) {
|
|
throw "UploadOnly requested but manifest '$manifestPath' does not exist."
|
|
}
|
|
|
|
try {
|
|
$manifestObject = (Get-Content -LiteralPath $manifestPath -Encoding UTF8 -Raw) | ConvertFrom-Json
|
|
} catch {
|
|
throw "Failed to parse manifest '$manifestPath': $($_.Exception.Message)"
|
|
}
|
|
if (-not $manifestObject) {
|
|
throw "Manifest '$manifestPath' is empty or invalid."
|
|
}
|
|
if (-not $manifestObject.shards -or $manifestObject.shards.Count -eq 0) {
|
|
throw "Manifest '$manifestPath' does not contain shard metadata."
|
|
}
|
|
|
|
$manifestHash = (Get-FileHash -Path $manifestPath -Algorithm SHA256).Hash.ToLowerInvariant()
|
|
|
|
$manifestShards = @()
|
|
$totalSizeBytes = 0L
|
|
foreach ($entry in ($manifestObject.shards | Sort-Object name)) {
|
|
$name = [string]$entry.name
|
|
if ([string]::IsNullOrWhiteSpace($name)) { continue }
|
|
$localPath = Join-Path -Path $localShardRoot -ChildPath $name
|
|
if (-not (Test-Path -LiteralPath $localPath)) {
|
|
throw "Shard file '$name' listed in manifest was not found under '$localShardRoot'."
|
|
}
|
|
$fileInfo = Get-Item -LiteralPath $localPath
|
|
$totalSizeBytes += $fileInfo.Length
|
|
$manifestShards += [pscustomobject]@{
|
|
name = $name
|
|
prefix = [string]$entry.prefix
|
|
entries = if ($entry.entries -ne $null) { [long]$entry.entries } else { 0L }
|
|
size = [string]$fileInfo.Length
|
|
sha256 = if ($entry.sha256) { [string]$entry.sha256.ToLowerInvariant() } else { '' }
|
|
}
|
|
}
|
|
if ($manifestShards.Count -eq 0) {
|
|
throw "Manifest '$manifestPath' did not produce any shard records to upload."
|
|
}
|
|
|
|
$totalEntries = if ($manifestObject.totalEntries) { [long]$manifestObject.totalEntries } else { 0L }
|
|
$totalLines = if ($manifestObject.inputLines) { [long]$manifestObject.inputLines } else { 0L }
|
|
$invalidCount = if ($manifestObject.invalidEntries) { [long]$manifestObject.invalidEntries } else { 0L }
|
|
$skippedCount = if ($manifestObject.skippedEntries) { [long]$manifestObject.skippedEntries } else { 0L }
|
|
|
|
$normalizedShardPrefix = Get-NormalizedForwardPath -PathValue $ShardRemotePrefix
|
|
$manifestShardPrefix = Get-NormalizedForwardPath -PathValue $manifestObject.shardPrefix
|
|
if ($manifestShardPrefix -and $normalizedShardPrefix -and $manifestShardPrefix -ne $normalizedShardPrefix) {
|
|
Write-Warning ("ShardRemotePrefix '{0}' does not match manifest shardPrefix '{1}'; using manifest value." -f $normalizedShardPrefix, $manifestShardPrefix)
|
|
}
|
|
if ($manifestShardPrefix) {
|
|
$normalizedShardPrefix = $manifestShardPrefix
|
|
}
|
|
|
|
Write-Host "UploadOnly requested; reusing existing artifacts under '$OutputRoot'."
|
|
Write-Host ("Manifest SHA256: {0}" -f $manifestHash)
|
|
|
|
$summaryMessage = ("Summary: {0} shards, {1} valid hashes, {2} invalid entries, {3} skipped, {4:N0} bytes." -f $manifestShards.Count, $totalEntries, $invalidCount, $skippedCount, $totalSizeBytes)
|
|
} else {
|
|
if (-not $NoCheckpoint) {
|
|
if (-not $ForcePlainText) {
|
|
Write-Warning 'Checkpointing is only available with -ForcePlainText; continuing without checkpoints.'
|
|
} else {
|
|
if (-not $CheckpointPath) {
|
|
$CheckpointPath = Join-Path -Path $OutputRoot -ChildPath 'khdb.checkpoint.json'
|
|
}
|
|
$resolvedCheckpointPath = [System.IO.Path]::GetFullPath($CheckpointPath)
|
|
$checkpointDirectory = Split-Path -Path $resolvedCheckpointPath -Parent
|
|
if ($checkpointDirectory -and -not (Test-Path -LiteralPath $checkpointDirectory)) {
|
|
[System.IO.Directory]::CreateDirectory($checkpointDirectory) | Out-Null
|
|
}
|
|
if (Test-Path -LiteralPath $resolvedCheckpointPath) {
|
|
try {
|
|
$resumeState = (Get-Content -LiteralPath $resolvedCheckpointPath -Encoding UTF8 -Raw) | ConvertFrom-Json
|
|
} catch {
|
|
throw "Failed to parse checkpoint '$resolvedCheckpointPath': $($_.Exception.Message)"
|
|
}
|
|
if (-not $resumeState) { throw "Checkpoint '$resolvedCheckpointPath' is empty or invalid." }
|
|
if ($resumeState.version -and $resumeState.version -ne 1) { throw "Unsupported checkpoint version $($resumeState.version)." }
|
|
$resume = $true
|
|
}
|
|
$checkpointEnabled = $true
|
|
}
|
|
}
|
|
|
|
if ($resume -and $resumeState -and $resumeState.sourcePath) {
|
|
$resumeSourcePath = [System.IO.Path]::GetFullPath([string]$resumeState.sourcePath)
|
|
if ($resumeSourcePath -ne $resolvedSource.Path) {
|
|
throw "Checkpoint source '$resumeSourcePath' does not match current source '$($resolvedSource.Path)'."
|
|
}
|
|
}
|
|
|
|
if (Test-Path -LiteralPath $OutputRoot) {
|
|
$startingFresh = -not $resume
|
|
if ($startingFresh) {
|
|
if (-not $Force) {
|
|
$existing = Get-ChildItem -LiteralPath $OutputRoot -Force | Select-Object -First 1
|
|
if ($existing) {
|
|
throw "Output root '$OutputRoot' already exists and is not empty. Use -Force to overwrite."
|
|
}
|
|
} else {
|
|
Remove-DirectoryContents -Path $OutputRoot
|
|
}
|
|
}
|
|
} else {
|
|
Ensure-Directory $OutputRoot
|
|
}
|
|
|
|
Ensure-Directory $localShardRoot
|
|
|
|
$invalidReportPath = Join-Path -Path $OutputRoot -ChildPath 'invalid-hashes.txt'
|
|
if (-not $resume -and (Test-Path -LiteralPath $invalidReportPath)) { Remove-Item -LiteralPath $invalidReportPath -Force }
|
|
|
|
Write-Host "Splitting '$($resolvedSource.Path)' into shard prefix length $ShardSize..."
|
|
$splitResult = Split-KhdbIntoShards -Source $resolvedSource.Path -ShardRoot $localShardRoot -PrefixLength $ShardSize -InvalidOutputPath $invalidReportPath -ShowProgress:$ShowProgress -ProgressUpdateInterval $ProgressUpdateInterval -ProgressActivity 'Preparing KHDB shards' -ForcePlainText:$ForcePlainText -EnableCheckpoint:$checkpointEnabled -CheckpointPath $resolvedCheckpointPath -Resume:$resume -ResumeState $resumeState
|
|
$totalEntries = [long]$splitResult.TotalEntries
|
|
$totalLines = [long]$splitResult.TotalLines
|
|
$invalidCount = [long]$splitResult.InvalidLines
|
|
$skippedCount = [long]$splitResult.SkippedLines
|
|
Write-Host ("Input summary: {0} non-empty line(s) -> {1} valid hash(es), {2} invalid entr(y/ies), {3} skipped." -f $totalLines, $totalEntries, $invalidCount, $skippedCount)
|
|
if ([long]$splitResult.LegacyCount -gt 0) {
|
|
Write-Warning ("Detected {0} legacy HASH:count entries. Output shards and khdb-clean.txt were normalized to hash-only lines for DSInternals compatibility." -f [long]$splitResult.LegacyCount)
|
|
}
|
|
if ($invalidCount -gt 0) {
|
|
if ($splitResult.InvalidOutputPath) {
|
|
Write-Warning ("Invalid lines saved to {0}" -f $splitResult.InvalidOutputPath)
|
|
}
|
|
if ($splitResult.InvalidSamples -and $splitResult.InvalidSamples.Count -gt 0) {
|
|
Write-Warning "Sample invalid lines:"
|
|
foreach ($sample in $splitResult.InvalidSamples) {
|
|
Write-Warning (" {0}" -f $sample)
|
|
}
|
|
}
|
|
} elseif (-not $resume -and (Test-Path -LiteralPath $invalidReportPath)) {
|
|
Remove-Item -LiteralPath $invalidReportPath -Force
|
|
}
|
|
|
|
$manifestShards = @()
|
|
$totalSizeBytes = 0L
|
|
|
|
foreach ($prefix in ($splitResult.ShardStats.Keys | Sort-Object)) {
|
|
$info = $splitResult.ShardStats[$prefix]
|
|
$fileInfo = Get-Item -LiteralPath $info.Path
|
|
$totalSizeBytes += $fileInfo.Length
|
|
$hash = (Get-FileHash -Path $info.Path -Algorithm SHA256).Hash.ToLowerInvariant()
|
|
$manifestShards += [pscustomobject]@{
|
|
name = "$prefix.txt"
|
|
prefix = $prefix
|
|
entries = $info.Count
|
|
size = [string]$fileInfo.Length
|
|
sha256 = $hash
|
|
}
|
|
}
|
|
|
|
$manifestVersionValue = if ([string]::IsNullOrWhiteSpace($ManifestVersion)) {
|
|
(Get-Date).ToString('yyyyMMdd-HHmmss')
|
|
} else {
|
|
$ManifestVersion
|
|
}
|
|
|
|
$normalizedShardPrefix = Get-NormalizedForwardPath -PathValue $ShardRemotePrefix
|
|
$manifestObject = [ordered]@{
|
|
version = $manifestVersionValue
|
|
generatedAt = (Get-Date).ToUniversalTime().ToString('o')
|
|
shardSize = $ShardSize
|
|
shardPrefix = $normalizedShardPrefix
|
|
totalEntries = $totalEntries
|
|
inputLines = $totalLines
|
|
invalidEntries = $invalidCount
|
|
skippedEntries = $skippedCount
|
|
totalShards = $manifestShards.Count
|
|
totalSize = [string]$totalSizeBytes
|
|
shards = $manifestShards
|
|
}
|
|
|
|
Write-Host ("Writing manifest to {0}" -f $manifestPath)
|
|
Write-JsonFile -Path $manifestPath -Data $manifestObject
|
|
$manifestHash = (Get-FileHash -Path $manifestPath -Algorithm SHA256).Hash.ToLowerInvariant()
|
|
|
|
Write-Host ("Manifest SHA256: {0}" -f $manifestHash)
|
|
$cleanCombinedPath = Join-Path -Path $OutputRoot -ChildPath 'khdb-clean.txt'
|
|
Write-Host ("Writing cleaned aggregate to {0}..." -f $cleanCombinedPath)
|
|
Merge-ShardsToFile -Manifest $manifestObject -ShardsRoot $localShardRoot -TargetPath $cleanCombinedPath
|
|
$cleanHash = (Get-FileHash -Path $cleanCombinedPath -Algorithm SHA256).Hash.ToLowerInvariant()
|
|
Write-Host ("Clean KHDB SHA256: {0}" -f $cleanHash)
|
|
$summaryMessage = ("Summary: {0} shards, {1} valid hashes, {2} invalid entries, {3} skipped, {4:N0} bytes." -f $manifestShards.Count, $totalEntries, $invalidCount, $skippedCount, $totalSizeBytes)
|
|
}
|
|
|
|
$normalizedManifestRemote = Get-NormalizedForwardPath -PathValue $ManifestRemotePath
|
|
if ([string]::IsNullOrEmpty($normalizedManifestRemote)) {
|
|
throw 'ManifestRemotePath cannot be empty.'
|
|
}
|
|
|
|
if (-not $UploadOnly) {
|
|
if ($SkipUpload) {
|
|
Write-Host "SkipUpload requested; files ready under '$OutputRoot'."
|
|
Write-Host $summaryMessage
|
|
if ($checkpointEnabled -and $resolvedCheckpointPath -and (Test-Path -LiteralPath $resolvedCheckpointPath)) {
|
|
Remove-Item -LiteralPath $resolvedCheckpointPath -Force
|
|
}
|
|
return
|
|
}
|
|
}
|
|
elseif ($SkipUpload) {
|
|
# Should never hit due to earlier validation, but guard defensively.
|
|
return
|
|
}
|
|
|
|
switch ($StorageProvider.ToUpperInvariant()) {
|
|
'AZURE' {
|
|
if ([string]::IsNullOrWhiteSpace($StorageAccountName)) { throw 'storageAccountName is required for Azure uploads.' }
|
|
if ([string]::IsNullOrWhiteSpace($ContainerName)) { throw 'containerName is required for Azure uploads.' }
|
|
if ([string]::IsNullOrWhiteSpace($SasToken)) { throw 'sasToken is required for Azure uploads.' }
|
|
|
|
if ($parallelTransfersEnabled) {
|
|
Write-Host ("Uploading shards to Azure Blob Storage container '{0}' with up to {1} concurrent transfer(s)..." -f $ContainerName, $effectiveParallelTransfers)
|
|
$prefixForParallelUpload = if ([string]::IsNullOrWhiteSpace($normalizedShardPrefix)) { $null } else { $normalizedShardPrefix.Replace('\', '/').Trim('/') }
|
|
$manifestShards | ForEach-Object -Parallel {
|
|
param($entry)
|
|
try {
|
|
foreach ($helper in $using:parallelAzureUploadHelperList) {
|
|
if (-not (Get-Command $helper.Name -ErrorAction SilentlyContinue)) {
|
|
Invoke-Expression $helper.Definition
|
|
}
|
|
}
|
|
$localPath = Join-Path -Path $using:localShardRoot -ChildPath $entry.name
|
|
$remoteKey = $entry.name.Replace('\', '/').TrimStart('/')
|
|
if (-not [string]::IsNullOrWhiteSpace($using:prefixForParallelUpload)) {
|
|
$remoteKey = $using:prefixForParallelUpload + '/' + $remoteKey
|
|
}
|
|
Upload-AzureBlob -Account $using:StorageAccountName -Container $using:ContainerName -Sas $using:SasToken -BlobName $remoteKey -FilePath $localPath -ContentType 'text/plain'
|
|
Write-Host (" -> {0}" -f $remoteKey)
|
|
} catch {
|
|
throw ("Shard '{0}': {1}" -f $entry.name, $_.Exception.Message)
|
|
}
|
|
} -ThrottleLimit $effectiveParallelTransfers
|
|
} else {
|
|
Write-Host "Uploading shards to Azure Blob Storage container '$ContainerName'..."
|
|
foreach ($entry in $manifestShards) {
|
|
$localPath = Join-Path -Path $localShardRoot -ChildPath $entry.name
|
|
$remoteKey = Combine-StoragePath -Prefix $normalizedShardPrefix -Name $entry.name
|
|
Write-Host (" -> {0}" -f $remoteKey)
|
|
Upload-AzureBlob -Account $StorageAccountName -Container $ContainerName -Sas $SasToken -BlobName $remoteKey -FilePath $localPath -ContentType 'text/plain'
|
|
}
|
|
}
|
|
|
|
Write-Host ("Uploading manifest to {0}" -f $normalizedManifestRemote)
|
|
Upload-AzureBlob -Account $StorageAccountName -Container $ContainerName -Sas $SasToken -BlobName $normalizedManifestRemote -FilePath $manifestPath -ContentType 'application/json'
|
|
}
|
|
'S3' {
|
|
if ([string]::IsNullOrWhiteSpace($S3EndpointUrl)) { throw 's3EndpointUrl is required for S3 uploads.' }
|
|
if ([string]::IsNullOrWhiteSpace($S3BucketName)) { throw 's3BucketName is required for S3 uploads.' }
|
|
if ([string]::IsNullOrWhiteSpace($S3AccessKeyId) -or [string]::IsNullOrWhiteSpace($S3SecretAccessKey)) {
|
|
throw 's3AccessKeyId and s3SecretAccessKey are required for S3 uploads.'
|
|
}
|
|
|
|
if ($parallelTransfersEnabled) {
|
|
Write-Host ("Uploading shards to S3 bucket '{0}' with up to {1} concurrent transfer(s)..." -f $S3BucketName, $effectiveParallelTransfers)
|
|
$prefixForParallelUpload = if ([string]::IsNullOrWhiteSpace($normalizedShardPrefix)) { $null } else { $normalizedShardPrefix.Replace('\', '/').Trim('/') }
|
|
$manifestShards | ForEach-Object -Parallel {
|
|
param($entry)
|
|
try {
|
|
foreach ($helper in $using:parallelS3UploadHelperList) {
|
|
if (-not (Get-Command $helper.Name -ErrorAction SilentlyContinue)) {
|
|
Invoke-Expression $helper.Definition
|
|
}
|
|
}
|
|
$localPath = Join-Path -Path $using:localShardRoot -ChildPath $entry.name
|
|
$remoteKey = $entry.name.Replace('\', '/').TrimStart('/')
|
|
if (-not [string]::IsNullOrWhiteSpace($using:prefixForParallelUpload)) {
|
|
$remoteKey = $using:prefixForParallelUpload + '/' + $remoteKey
|
|
}
|
|
Invoke-S3HttpUpload -EndpointUrl $using:S3EndpointUrl -Bucket $using:S3BucketName -Key $remoteKey -FilePath $localPath -Region $using:S3Region -AccessKeyId $using:S3AccessKeyId -SecretAccessKey $using:S3SecretAccessKey -ForcePathStyle $using:S3ForcePathStyle -PayloadHash $entry.sha256 -ContentType 'text/plain'
|
|
Write-Host (" -> {0}" -f $remoteKey)
|
|
} catch {
|
|
throw ("Shard '{0}': {1}" -f $entry.name, $_.Exception.Message)
|
|
}
|
|
} -ThrottleLimit $effectiveParallelTransfers
|
|
} else {
|
|
Write-Host "Uploading shards to S3 bucket '$S3BucketName'..."
|
|
foreach ($entry in $manifestShards) {
|
|
$localPath = Join-Path -Path $localShardRoot -ChildPath $entry.name
|
|
$remoteKey = Combine-StoragePath -Prefix $normalizedShardPrefix -Name $entry.name
|
|
Write-Host (" -> {0}" -f $remoteKey)
|
|
Invoke-S3HttpUpload -EndpointUrl $S3EndpointUrl -Bucket $S3BucketName -Key $remoteKey -FilePath $localPath -Region $S3Region -AccessKeyId $S3AccessKeyId -SecretAccessKey $S3SecretAccessKey -ForcePathStyle $S3ForcePathStyle -PayloadHash $entry.sha256 -ContentType 'text/plain'
|
|
}
|
|
}
|
|
|
|
Write-Host ("Uploading manifest to {0}" -f $normalizedManifestRemote)
|
|
Invoke-S3HttpUpload -EndpointUrl $S3EndpointUrl -Bucket $S3BucketName -Key $normalizedManifestRemote -FilePath $manifestPath -Region $S3Region -AccessKeyId $S3AccessKeyId -SecretAccessKey $S3SecretAccessKey -ForcePathStyle $S3ForcePathStyle -PayloadHash $manifestHash -ContentType 'application/json'
|
|
}
|
|
default {
|
|
Write-Host "StorageProvider set to 'None'; skipping upload. Files available under '$OutputRoot'."
|
|
}
|
|
}
|
|
|
|
Write-Host "Upload completed successfully."
|
|
Write-Host $summaryMessage
|
|
|
|
if ($checkpointEnabled -and $resolvedCheckpointPath -and (Test-Path -LiteralPath $resolvedCheckpointPath)) {
|
|
Remove-Item -LiteralPath $resolvedCheckpointPath -Force
|
|
}
|