Line data Source code
1 : /*
2 : * GPAC - Multimedia Framework C SDK
3 : *
4 : * Authors: Deniz Ugur, Romain Bouqueau, Sohaib Larbi
5 : * Copyright (c) Motion Spell
6 : * All rights reserved
7 : *
8 : * This file is part of the GPAC/GStreamer wrapper
9 : *
10 : * This GPAC/GStreamer wrapper is free software; you can redistribute it
11 : * and/or modify it under the terms of the GNU Affero General Public License
12 : * as published by the Free Software Foundation; either version 3, or (at
13 : * your option) any later version.
14 : *
15 : * This GPAC/GStreamer wrapper is distributed in the hope that it will be
16 : * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
17 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 : * GNU Affero General Public License for more details.
19 : *
20 : * You should have received a copy of the GNU Affero General Public
21 : * License along with this library; see the file LICENSE. If not, write to
22 : * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 : *
24 : */
25 :
26 : #include "common.h"
27 : #include "gpacmessages.h"
28 : #include "lib/memio.h"
29 : #include "lib/signals.h"
30 :
31 : #include <gio/gio.h>
32 : #include <gpac/mpd.h>
33 : #include <gpac/network.h>
34 :
35 : GST_DEBUG_CATEGORY_STATIC(gpac_dasher);
36 : #define GST_CAT_DEFAULT gpac_dasher
37 :
38 : typedef struct
39 : {
40 : gchar* name; // Name of the file
41 : GFile* file; // GFile object for the file (optional)
42 : GOutputStream* out; // Output stream for the file
43 : } FileAbstract;
44 :
45 : typedef struct
46 : {
47 : // current file being processed
48 : FileAbstract* main_file;
49 : FileAbstract* llhls_file; // for low-latency HLS chunks
50 :
51 : gchar* llhas_template;
52 : gboolean is_manifest;
53 : guint32 dash_state;
54 : gchar* original_dst;
55 : const gchar* dst; // destination file path
56 : } DasherCtx;
57 :
58 : void
59 10 : dasher_ctx_init(void** process_ctx)
60 : {
61 10 : *process_ctx = g_new0(DasherCtx, 1);
62 10 : DasherCtx* ctx = (DasherCtx*)*process_ctx;
63 :
64 10 : GST_DEBUG_CATEGORY_INIT(
65 : gpac_dasher, "gpacdasherpp", 0, "GPAC dasher post-processor");
66 10 : }
67 :
68 : void
69 0 : dasher_free_file(FileAbstract* file)
70 : {
71 0 : if (file) {
72 0 : if (file->out) {
73 0 : g_output_stream_close(file->out, NULL, NULL);
74 0 : g_object_unref(file->out);
75 : }
76 0 : if (file->file) {
77 0 : g_object_unref(file->file);
78 : }
79 0 : g_free(file->name);
80 0 : g_free(file);
81 : }
82 0 : }
83 :
84 : void
85 0 : dasher_ctx_free(void* process_ctx)
86 : {
87 0 : DasherCtx* ctx = (DasherCtx*)process_ctx;
88 :
89 : // Free the main file if it exists
90 0 : dasher_free_file(ctx->main_file);
91 0 : dasher_free_file(ctx->llhls_file);
92 :
93 : // Free the llhas template if it exists
94 0 : if (ctx->llhas_template)
95 0 : g_free(ctx->llhas_template);
96 :
97 : // Free the context
98 0 : g_free(ctx->original_dst);
99 0 : g_free(ctx);
100 0 : }
101 :
102 : GF_Err
103 10 : dasher_configure_pid(GF_Filter* filter, GF_FilterPid* pid)
104 : {
105 : GPAC_MemOutPIDContext* ctx =
106 10 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
107 10 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
108 10 : GPAC_MemOutPIDFlags udta_flags = gf_filter_pid_get_udta_flags(pid);
109 :
110 : // We'll transfer the data over signals
111 10 : udta_flags |= GPAC_MEMOUT_PID_FLAG_DONT_CONSUME;
112 10 : gf_filter_pid_set_udta_flags(pid, udta_flags);
113 :
114 : const GF_PropertyValue* p =
115 10 : gf_filter_pid_get_property(pid, GF_PROP_PID_IS_MANIFEST);
116 10 : if (p && p->value.uint)
117 2 : dasher_ctx->is_manifest = TRUE;
118 : else
119 8 : dasher_ctx->dash_state = 1;
120 :
121 : // Get the destination path
122 : GF_PropertyValue dst;
123 10 : Bool found = gf_filter_get_arg(filter, "dst", &dst);
124 10 : if (found) {
125 10 : if (dasher_ctx->original_dst)
126 0 : g_free(dasher_ctx->original_dst);
127 20 : dasher_ctx->original_dst = g_strdup(dst.value.string);
128 : }
129 :
130 : GPAC_MemOutPrivateContext* fctx =
131 10 : (GPAC_MemOutPrivateContext*)gf_filter_pid_get_alias_udta(pid);
132 10 : if (fctx) {
133 8 : dasher_ctx->dst = fctx->dst;
134 : } else {
135 2 : fctx = (GPAC_MemOutPrivateContext*)gf_filter_get_udta(filter);
136 2 : dasher_ctx->dst = fctx->dst;
137 : }
138 :
139 10 : return GF_OK;
140 : }
141 :
142 : Bool
143 0 : dasher_process_event(GF_Filter* filter, const GF_FilterEvent* evt)
144 : {
145 0 : if (evt->base.type == GF_FEVT_FILE_DELETE) {
146 : GPAC_MemIoContext* io_ctx =
147 0 : (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
148 : GPAC_MemOutPIDContext* ctx =
149 0 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(evt->base.on_pid);
150 0 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
151 :
152 0 : GST_TRACE_OBJECT(io_ctx->sess->element,
153 : "Received file delete event for PID %s: %s",
154 : gf_filter_pid_get_name(evt->base.on_pid),
155 : evt->file_del.url);
156 :
157 0 : gboolean sent = gpac_signal_try_emit(io_ctx->sess->element,
158 : GPAC_SIGNAL_DASHER_DELETE_SEGMENT,
159 0 : evt->file_del.url,
160 : NULL);
161 :
162 0 : if (!sent) {
163 0 : GFile* file = g_file_new_for_path(evt->file_del.url);
164 0 : GError* error = NULL;
165 0 : if (!g_file_delete(file, NULL, &error)) {
166 0 : GST_ELEMENT_WARNING(io_ctx->sess->element,
167 : RESOURCE,
168 : FAILED,
169 : (NULL),
170 : ("Failed to delete file %s: %s",
171 : evt->file_del.url,
172 : error ? error->message : "Unknown error"));
173 0 : if (error)
174 0 : g_error_free(error);
175 : }
176 0 : g_object_unref(file);
177 : }
178 :
179 0 : return GF_TRUE;
180 : }
181 0 : return GF_FALSE;
182 : }
183 :
184 : void
185 472 : dasher_open_close_file(GF_Filter* filter,
186 : GF_FilterPid* pid,
187 : const gchar* name,
188 : gboolean is_llhls)
189 : {
190 472 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
191 : GPAC_MemOutPIDContext* ctx =
192 472 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
193 472 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
194 :
195 : // Get a pointer to the requested file
196 472 : FileAbstract** file = NULL;
197 472 : if (is_llhls) {
198 146 : file = &dasher_ctx->llhls_file;
199 : } else {
200 326 : file = &dasher_ctx->main_file;
201 : }
202 :
203 : // If the file is already open, close it
204 472 : if (*file) {
205 94 : GST_TRACE_OBJECT(io_ctx->sess->element,
206 : "Closing file for PID %s: %s",
207 : gf_filter_pid_get_name(pid),
208 : (*file)->name);
209 94 : if ((*file)->out)
210 94 : g_output_stream_close((*file)->out, NULL, NULL);
211 94 : if ((*file)->file) {
212 0 : g_object_unref((*file)->out);
213 0 : g_object_unref((*file)->file);
214 : }
215 :
216 94 : g_free((*file)->name);
217 94 : g_free(*file);
218 94 : *file = NULL;
219 : }
220 :
221 472 : if (!name)
222 378 : return; // No file to open
223 :
224 : // Create a new file
225 94 : *file = g_new0(FileAbstract, 1);
226 :
227 94 : g_assert(dasher_ctx->original_dst);
228 94 : gchar* base_dir = g_path_get_dirname(dasher_ctx->original_dst);
229 94 : gchar* canonical_base_dir = g_path_is_absolute(base_dir)
230 0 : ? g_strdup(base_dir)
231 94 : : g_canonicalize_filename(base_dir, NULL);
232 94 : (*file)->name = g_path_is_absolute(name)
233 0 : ? g_strdup(name)
234 94 : : g_canonicalize_filename(name, canonical_base_dir);
235 94 : g_free(canonical_base_dir);
236 94 : g_free(base_dir);
237 :
238 94 : GST_TRACE_OBJECT(io_ctx->sess->element,
239 : "Opening new file for PID %s: %s",
240 : gf_filter_pid_get_name(pid),
241 : (*file)->name);
242 :
243 : // Decide on the file flags
244 94 : gboolean has_os = FALSE;
245 94 : if (dasher_ctx->is_manifest) {
246 58 : if (g_strcmp0(name, dasher_ctx->dst) == 0) {
247 2 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
248 : GPAC_SIGNAL_DASHER_MANIFEST,
249 2 : (*file)->name,
250 2 : &(*file)->out);
251 : } else {
252 56 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
253 : GPAC_SIGNAL_DASHER_MANIFEST_VARIANT,
254 56 : (*file)->name,
255 56 : &(*file)->out);
256 : }
257 : } else {
258 36 : if (g_strcmp0(name, dasher_ctx->dst) == 0) {
259 4 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
260 : GPAC_SIGNAL_DASHER_SEGMENT_INIT,
261 4 : (*file)->name,
262 4 : &(*file)->out);
263 : } else {
264 32 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
265 : GPAC_SIGNAL_DASHER_SEGMENT,
266 32 : (*file)->name,
267 32 : &(*file)->out);
268 : }
269 : }
270 :
271 94 : if (!has_os) {
272 : // Create a GFile and GOutputStream for the file
273 0 : (*file)->file = g_file_new_for_path((*file)->name);
274 :
275 0 : GError* error = NULL;
276 0 : (*file)->out = G_OUTPUT_STREAM(g_file_replace(
277 : (*file)->file, NULL, FALSE, G_FILE_CREATE_NONE, NULL, &error));
278 0 : if (!(*file)->out) {
279 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
280 : STREAM,
281 : FAILED,
282 : (NULL),
283 : ("Failed to open output stream for file %s: %s",
284 : (*file)->name,
285 : error ? error->message : "Unknown error"));
286 0 : g_error_free(error);
287 0 : g_object_unref((*file)->file);
288 :
289 : // Reset the current file pointer
290 0 : g_free((*file)->name);
291 0 : g_free(*file);
292 0 : *file = NULL;
293 0 : return;
294 : }
295 : }
296 : }
297 :
298 : void
299 6 : dasher_setup_file(GF_Filter* filter, GF_FilterPid* pid)
300 : {
301 : GPAC_MemOutPIDContext* ctx =
302 6 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
303 6 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
304 :
305 : const GF_PropertyValue* p =
306 6 : gf_filter_pid_get_property(pid, GF_PROP_PID_OUTPATH);
307 6 : if (p && p->value.string) {
308 0 : dasher_open_close_file(filter, pid, p->value.string, FALSE);
309 0 : return;
310 : }
311 :
312 6 : if (dasher_ctx->dst) {
313 6 : dasher_open_close_file(filter, pid, dasher_ctx->dst, FALSE);
314 : } else {
315 0 : p = gf_filter_pid_get_property(pid, GF_PROP_PID_FILEPATH);
316 0 : if (!p)
317 0 : p = gf_filter_pid_get_property(pid, GF_PROP_PID_URL);
318 0 : if (p && p->value.string)
319 0 : dasher_open_close_file(filter, pid, p->value.string, FALSE);
320 : }
321 : }
322 :
323 : GF_Err
324 126 : dasher_ensure_file(GF_Filter* filter, GF_FilterPid* pid, gboolean is_llhls)
325 : {
326 126 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
327 : GPAC_MemOutPIDContext* ctx =
328 126 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
329 126 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
330 :
331 126 : FileAbstract** file = NULL;
332 126 : if (is_llhls) {
333 0 : file = &dasher_ctx->llhls_file;
334 : } else {
335 126 : file = &dasher_ctx->main_file;
336 : }
337 :
338 : // If we don't have a file, set it up
339 126 : if (G_UNLIKELY(!(*file))) {
340 0 : GST_ELEMENT_ERROR(
341 : io_ctx->sess->element,
342 : STREAM,
343 : FAILED,
344 : (NULL),
345 : ("Failed to ensure file for PID %s", gf_filter_pid_get_name(pid)));
346 0 : return GF_IO_ERR;
347 : }
348 :
349 126 : if (G_UNLIKELY(!(*file)->out)) {
350 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
351 : STREAM,
352 : FAILED,
353 : (NULL),
354 : ("No output stream for file %s",
355 : (*file)->name ? (*file)->name : "unknown"));
356 0 : gf_filter_abort(filter);
357 0 : return GF_IO_ERR;
358 : }
359 :
360 126 : return GF_OK;
361 : }
362 :
363 : GF_Err
364 126 : dasher_write_data(GF_Filter* filter,
365 : GF_FilterPid* pid,
366 : FileAbstract* file,
367 : const u8* data,
368 : u32 size)
369 : {
370 126 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
371 : GPAC_MemOutPIDContext* ctx =
372 126 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
373 126 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
374 :
375 126 : if (!file || !file->out) {
376 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
377 : STREAM,
378 : FAILED,
379 : (NULL),
380 : ("No output stream for file %s",
381 : file && file->name ? file->name : "unknown"));
382 0 : return GF_IO_ERR;
383 : }
384 :
385 : gssize bytes_written =
386 126 : g_output_stream_write(file->out, data, size, NULL, NULL);
387 126 : if (bytes_written < 0) {
388 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
389 : STREAM,
390 : FAILED,
391 : (NULL),
392 : ("Failed to write data to output stream for file %s",
393 : file->name ? file->name : "unknown"));
394 0 : return GF_IO_ERR;
395 : }
396 :
397 126 : if (bytes_written < (gssize)size) {
398 0 : GST_ELEMENT_WARNING(io_ctx->sess->element,
399 : STREAM,
400 : FAILED,
401 : (NULL),
402 : ("Partial write to output stream for file %s, "
403 : "expected: %" G_GSIZE_FORMAT
404 : ", written: %" G_GSSIZE_FORMAT,
405 : file->name ? file->name : "unknown",
406 : (gssize)size,
407 : bytes_written));
408 : }
409 :
410 126 : GST_TRACE_OBJECT(io_ctx->sess->element,
411 : "Wrote %s, size: %" G_GSIZE_FORMAT,
412 : file->name ? file->name : "unknown",
413 : (gssize)size);
414 :
415 126 : return GF_OK;
416 : }
417 :
418 : GF_Err
419 700 : dasher_post_process(GF_Filter* filter, GF_FilterPid* pid, GF_FilterPacket* pck)
420 : {
421 700 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
422 : GPAC_MemOutPIDContext* ctx =
423 700 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
424 700 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
425 : const GF_PropertyValue* fname;
426 : const GF_PropertyValue* p;
427 :
428 700 : if (!pck) {
429 574 : if (gf_filter_pid_is_eos(pid) && !gf_filter_pid_is_flush_eos(pid)) {
430 146 : dasher_open_close_file(filter, pid, NULL, FALSE);
431 146 : dasher_open_close_file(filter, pid, NULL, TRUE);
432 : }
433 574 : return GF_OK; // No packet to process
434 : }
435 :
436 : // Check the packet framing
437 : Bool start;
438 : Bool end;
439 126 : gf_filter_pck_get_framing(pck, &start, &end);
440 126 : if (dasher_ctx->dash_state) {
441 68 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENUM);
442 68 : if (p) {
443 32 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENAME);
444 32 : if (p && p->value.string)
445 32 : start = GF_TRUE;
446 : }
447 :
448 68 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_EODS);
449 68 : if (p && p->value.boolean)
450 0 : end = GF_TRUE;
451 : }
452 :
453 126 : if (start) {
454 : // Previous file has ended, move to the next file
455 94 : if (dasher_ctx->main_file)
456 28 : dasher_open_close_file(filter, pid, NULL, FALSE);
457 :
458 : const GF_PropertyValue* ext;
459 : const GF_PropertyValue* fnum;
460 : const GF_PropertyValue* rel;
461 94 : Bool explicit_overwrite = GF_FALSE;
462 94 : const char* name = NULL;
463 94 : fname = ext = NULL;
464 : // file num increased per packet, open new file
465 94 : fnum = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENUM);
466 94 : if (fnum) {
467 32 : fname = gf_filter_pid_get_property(pid, GF_PROP_PID_OUTPATH);
468 32 : ext = gf_filter_pid_get_property(pid, GF_PROP_PID_FILE_EXT);
469 32 : if (!fname)
470 32 : name = dasher_ctx->dst;
471 : }
472 : // filename change at packet start, open new file
473 94 : if (!fname)
474 94 : fname = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENAME);
475 94 : if (fname)
476 88 : name = fname->value.string;
477 :
478 94 : if (name) {
479 88 : dasher_open_close_file(filter, pid, name, FALSE);
480 6 : } else if (!dasher_ctx->main_file) {
481 6 : dasher_setup_file(filter, pid);
482 : }
483 :
484 94 : fname = gf_filter_pck_get_property(pck, GF_PROP_PCK_LLHAS_TEMPLATE);
485 94 : if (fname) {
486 0 : if (dasher_ctx->llhas_template)
487 0 : g_free(dasher_ctx->llhas_template);
488 0 : dasher_ctx->llhas_template = g_strdup(fname->value.string);
489 : }
490 : }
491 :
492 : // Get the data
493 : u32 size;
494 126 : const u8* data = gf_filter_pck_get_data(pck, &size);
495 126 : if (G_UNLIKELY(!data || !size)) {
496 0 : GST_ELEMENT_WARNING(
497 : io_ctx->sess->element,
498 : STREAM,
499 : FAILED,
500 : (NULL),
501 : ("Received empty packet for PID %s\n", gf_filter_pid_get_name(pid)));
502 0 : return GF_OK;
503 : }
504 :
505 : // We must be actively working on a file by now
506 126 : gpac_return_if_fail(dasher_ensure_file(filter, pid, FALSE));
507 :
508 : // If we are in low-latency HLS mode, we need to handle the llhas chunks
509 126 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_LLHAS_FRAG_NUM);
510 126 : if (p) {
511 0 : char* llhas_chunkname = gf_mpd_resolve_subnumber(
512 0 : dasher_ctx->llhas_template, dasher_ctx->main_file->name, p->value.uint);
513 0 : dasher_open_close_file(
514 : filter, pid, llhas_chunkname, TRUE); // Open the llhls file
515 0 : gf_free(llhas_chunkname);
516 :
517 : // Ensure the file is set up for llhls
518 0 : gpac_return_if_fail(dasher_ensure_file(filter, pid, TRUE));
519 : }
520 :
521 : // Write the data to the output stream
522 126 : gpac_return_if_fail(
523 : dasher_write_data(filter, pid, dasher_ctx->main_file, data, size));
524 126 : if (dasher_ctx->llhls_file) {
525 : // Write to the llhls file if it exists
526 0 : gpac_return_if_fail(
527 : dasher_write_data(filter, pid, dasher_ctx->llhls_file, data, size));
528 : }
529 :
530 : // Close the output stream
531 126 : if (end && dasher_ctx->is_manifest)
532 58 : dasher_open_close_file(filter, pid, NULL, FALSE);
533 :
534 126 : return GF_OK;
535 : }
536 :
537 : GPAC_FilterPPRet
538 8050 : dasher_consume(GF_Filter* filter, GF_FilterPid* pid, void** outptr)
539 : {
540 : // We don't output any buffers directly
541 8050 : return GPAC_FILTER_PP_RET_NULL;
542 : }
|