Last active
December 16, 2016 09:16
-
-
Save jbgi/8ffcea4e1fd0780e5557cf5800bd33ad to your computer and use it in GitHub Desktop.
"Atomic" file sink through temporary file sink + renaming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file._ | |
import java.util.concurrent.Executor | |
import akka.Done | |
import akka.stream.IOResult | |
import akka.stream.scaladsl.{FileIO, Sink} | |
import akka.util.ByteString | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
sealed trait FileCreationOption | |
case object FailOnExist extends FileCreationOption | |
case object ReplaceExisting extends FileCreationOption | |
object FileSinks { | |
/** | |
* File sink that operate through a temporary file ("destinationFile.tmp") | |
* so that only complete "destinationFile" files are observable on file system. | |
* IOResult will contains an exception on errors including, by default, if the destination file already exist. | |
*/ | |
def toCompleteFile(destinationFile: Path, option: FileCreationOption = FailOnExist): Sink[ByteString, Future[IOResult]] = { | |
// Lazy factory is used to be able to use a different temporary file at each run | |
// (otherwise processes might be stuck due to residual tmp file): | |
Sink.lazyInit[ByteString, Future[IOResult]](_ => { | |
val fileSink = if (option == FailOnExist && Files.exists(destinationFile)) | |
failedSink(new FileAlreadyExistsException(destinationFile.toString)) | |
else try { | |
val tmpDestFile = Files.createTempFile(destinationFile.getParent, s"${destinationFile.getFileName.toString}.", ".tmp") | |
FileIO.toPath(tmpDestFile).mapMaterializedValue(_.map(writeResult => { | |
val moveResult = if (writeResult.wasSuccessful) | |
IOResult(writeResult.count, Try({ | |
option match { | |
case FailOnExist => Files.move(tmpDestFile, destinationFile) | |
case ReplaceExisting => Files.move(tmpDestFile, destinationFile, StandardCopyOption.REPLACE_EXISTING) | |
} | |
Done | |
})) | |
else writeResult | |
Try(Files.deleteIfExists(tmpDestFile)) // Best effort | |
moveResult | |
})(FileSinks.sameThreadExecutionContext)) | |
} catch { | |
//case were temporary file creation failed | |
case NonFatal(e) => failedSink(e) | |
} | |
Future.successful(fileSink) | |
}, () => Future.successful(IOResult.createSuccessful(0))) | |
.mapMaterializedValue(_.flatMap(identity)(FileSinks.sameThreadExecutionContext)) | |
} | |
private def failedSink(exception: Throwable) = Sink.cancelled[ByteString] | |
.mapMaterializedValue(_ => Future.successful(IOResult.createFailed(0, exception))) | |
private val sameThreadExecutionContext = ExecutionContext.fromExecutor(new Executor() { | |
override def execute(command: Runnable): Unit = command.run() | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment