Skip to content

Handler

Handler

Provides a file handler class that saves files directly to disk.

This module provides a file handler class that saves files directly to disk. It is used by the Volatility3 CLI to save files to disk.

Example

The file handler class can be used as follows:

$ python3
>>> from volatility3.cli import create_file_handler
>>> file_handler = create_file_handler("output")
>>> file = file_handler("test.txt")
>>> file.write(b"Hello, world!")
>>> file.close()
Todo
  • For now, this module only provides a file handler class that saves files directly to disk. In the future, it could be extended to provide other file handlers as well.

create_file_handler(output_dir, *, collision_policy='fail', run_id=None, use_run_subdirectory=True, temp_parent=None)

Create a file handler class that saves files directly to disk.

Parameters:

Name Type Description Default
output_dir str

The base directory for extracted artefacts. Unless use_run_subdirectory is false, <output_dir>/<run_id>/. Must be set for all modes.

required
collision_policy OutputCollisionPolicy

Behaviour when target file already exists: "fail" (default), "unique", "overwrite".

'fail'
run_id str

Optional identifier used when use_run_subdirectory=True. If omitted, a random UUID is generated.

None
use_run_subdirectory bool

When True (historic default), artefacts live directly under <output>/<run_id>. When False, output_dir is treated as the final extraction directory without extra nesting.

True
temp_parent str

Directory for staged *.vol3 temps. Defaults to the active extraction folder (legacy behaviour matched when nested).

None

Returns: type: A file handler class that saves files directly to disk.

Source code in pydfirram/core/handler.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def create_file_handler(
    output_dir: Optional[str],
    *,
    collision_policy: OutputCollisionPolicy = "fail",
    run_id: Optional[str] = None,
    use_run_subdirectory: bool = True,
    temp_parent: Optional[str] = None,
) -> type:
    """Create a file handler class that saves files directly to disk.

    Args:
        output_dir (str): The base directory for extracted artefacts. Unless
                          ``use_run_subdirectory`` is false, ``<output_dir>/<run_id>/``.
                          Must be set for all modes.
        collision_policy (OutputCollisionPolicy): Behaviour when target file
                          already exists: "fail" (default), "unique", "overwrite".
        run_id (str): Optional identifier used when ``use_run_subdirectory=True``.
                      If omitted, a random UUID is generated.
        use_run_subdirectory (bool): When ``True`` (historic default), artefacts
                                       live directly under ``<output>/<run_id>``.
                                       When ``False``, ``output_dir`` is treated as the
                                       final extraction directory without extra nesting.
        temp_parent (str): Directory for staged ``*.vol3`` temps. Defaults to the
                           active extraction folder (legacy behaviour matched when nested).
    Returns:
        type: A file handler class that saves files directly to disk.
    """
    if collision_policy not in {"fail", "unique", "overwrite"}:
        raise InvalidPluginArgumentError(
            "Politique de collision invalide. Valeurs supportees: "
            "'fail', 'unique', 'overwrite'."
        )

    effective_run_id = sanitize_run_identifier(run_id) if run_id else uuid.uuid4().hex
    if use_run_subdirectory:
        run_output_dir = (
            os.path.join(output_dir, effective_run_id)
            if output_dir is not None
            else None
        )
    else:
        run_output_dir = output_dir

    staging_parent = temp_parent if temp_parent is not None else run_output_dir

    class CLIFileHandler(V3FileHandlerInterface): # type: ignore
        """The FileHandler from Volatility3 CLI.
        """
        def _get_final_filename(self) -> str:
            """Gets the final filename for the saved file."""
            if run_output_dir is None:
                raise OutputHandlingError(
                    "Le dossier de sortie n'est pas configure correctement."
                )

            os.makedirs(run_output_dir, exist_ok=True)
            if self.preferred_filename is None:
                raise OutputHandlingError(
                    "Le plugin n'a pas fourni de nom de fichier de sortie."
                )

            safe_filename = _sanitize_filename(self.preferred_filename)
            output_filename = os.path.join(run_output_dir, safe_filename)
            if collision_policy == "unique":
                return _build_unique_path(output_filename)
            return output_filename

        def close(self) -> None:
            """ V3FileHandlerInterface require to implement this method """

    class CLIDirectFileHandler(CLIFileHandler):
        """A file handler class that saves files directly to disk.
        """
        def __init__(self, filename: str) -> None:
            if run_output_dir is None:
                raise OutputHandlingError(
                    "Le dossier de sortie n'est pas configure correctement."
                )
            if staging_parent is None:
                raise OutputHandlingError(
                    "Le repertoire temporaire d'extraction est indetermine."
                )
            os.makedirs(run_output_dir, exist_ok=True)
            os.makedirs(staging_parent, exist_ok=True)
            fd, temp_name = tempfile.mkstemp(
                suffix  = ".vol3",
                prefix  = "tmp_",
                dir     = staging_parent,
            )

            # allow `io.open()` without using `with` context
            # pylint: disable=R1732
            self._file = io.open(fd, mode="w+b")
            CLIFileHandler.__init__(self, _sanitize_filename(filename)) # type: ignore

            for attr in dir(self._file):
                if not attr.startswith("_") and attr not in [
                    "closed",
                    "close",
                    "mode",
                    "name",
                ]:
                    setattr(self, attr, getattr(self._file, attr))

            self._name = temp_name

        def __getattr__(self, item: Any) -> Any:
            return getattr(self._file, item)

        ## properties

        @property
        def closed(self) -> bool:
            """Returns whether the file is closed."""
            return self._file.closed

        @property
        def mode(self) -> str:
            """Returns the mode of the file."""
            return self._file.mode

        @property
        def name(self) -> str:
            """Returns the name of the file."""
            return self._file.name

        ## methods

        def close(self) -> None:
            """Closes and commits the file
            by moving the temporary file to the correct name.
            """
            # Don't overcommit
            if self._file.closed:
                return
            self._file.close()
            output_filename = self._get_final_filename()
            if collision_policy == "overwrite":
                os.replace(self._name, output_filename)
                return

            if os.path.exists(output_filename):
                os.unlink(self._name)
                if collision_policy == "fail":
                    raise ArtifactAlreadyExistsError(
                        "Un artefact existe deja pour cette extraction: "
                        f"{output_filename}. Modifiez la politique de collision "
                        "ou le dossier de sortie."
                    )
                raise OutputHandlingError(
                    f"Politique de collision inconnue: {collision_policy}"
                )

            os.rename(self._name, output_filename)

    return CLIDirectFileHandler

Politique d'extraction des fichiers

Le handler applique une politique forensic-safe par défaut :

  • Les sorties sont écrites dans un sous-répertoire dédié à l'exécution : <output_dir>/<run_id>/.
  • run_id est généré automatiquement si non fourni.
  • Les noms de fichiers fournis par les plugins sont assainis pour empêcher les chemins malicieux (path traversal).
  • Le comportement en cas de collision est configurable via collision_policy :
  • fail (défaut) : lève FileExistsError si le fichier existe déjà.
  • unique : crée un nouveau nom (artifact.txt, artifact-1.txt, etc.).
  • overwrite : autorise explicitement l'écrasement.

Exemple :

from pydfirram.core.handler import create_file_handler

handler_cls = create_file_handler(
    "/tmp/pydfirram-output",
    collision_policy="unique",
)