Line data Source code
1 : /*
2 : * GPAC - Multimedia Framework C SDK
3 : *
4 : * Authors: Deniz Ugur, Romain Bouqueau, Sohaib Larbi
5 : * Copyright (c) Motion Spell
6 : * All rights reserved
7 : *
8 : * This file is part of the GPAC/GStreamer wrapper
9 : *
10 : * This GPAC/GStreamer wrapper is free software; you can redistribute it
11 : * and/or modify it under the terms of the GNU Affero General Public License
12 : * as published by the Free Software Foundation; either version 3, or (at
13 : * your option) any later version.
14 : *
15 : * This GPAC/GStreamer wrapper is distributed in the hope that it will be
16 : * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
17 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 : * GNU Affero General Public License for more details.
19 : *
20 : * You should have received a copy of the GNU Affero General Public
21 : * License along with this library; see the file LICENSE. If not, write to
22 : * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 : *
24 : */
25 :
26 : #include "common.h"
27 : #include "gpacmessages.h"
28 : #include "lib/memio.h"
29 : #include "lib/signals.h"
30 :
31 : #include <gio/gio.h>
32 : #include <gpac/mpd.h>
33 : #include <gpac/network.h>
34 :
35 : GST_DEBUG_CATEGORY_STATIC(gpac_dasher);
36 : #define GST_CAT_DEFAULT gpac_dasher
37 :
38 : typedef struct
39 : {
40 : gchar* name; // Name of the file
41 : GFile* file; // GFile object for the file (optional)
42 : GOutputStream* out; // Output stream for the file
43 : } FileAbstract;
44 :
45 : typedef struct
46 : {
47 : // current file being processed
48 : FileAbstract* main_file;
49 : FileAbstract* llhls_file; // for low-latency HLS chunks
50 :
51 : gchar* llhas_template;
52 : gboolean is_manifest;
53 : guint32 dash_state;
54 : const gchar* dst; // destination file path
55 : } DasherCtx;
56 :
57 : void
58 10 : dasher_ctx_init(void** process_ctx)
59 : {
60 10 : *process_ctx = g_new0(DasherCtx, 1);
61 10 : DasherCtx* ctx = (DasherCtx*)*process_ctx;
62 :
63 10 : GST_DEBUG_CATEGORY_INIT(
64 : gpac_dasher, "gpacdasherpp", 0, "GPAC dasher post-processor");
65 10 : }
66 :
67 : void
68 0 : dasher_free_file(FileAbstract* file)
69 : {
70 0 : if (file) {
71 0 : if (file->out) {
72 0 : g_output_stream_close(file->out, NULL, NULL);
73 0 : g_object_unref(file->out);
74 : }
75 0 : if (file->file) {
76 0 : g_object_unref(file->file);
77 : }
78 0 : g_free(file->name);
79 0 : g_free(file);
80 : }
81 0 : }
82 :
83 : void
84 0 : dasher_ctx_free(void* process_ctx)
85 : {
86 0 : DasherCtx* ctx = (DasherCtx*)process_ctx;
87 :
88 : // Free the main file if it exists
89 0 : dasher_free_file(ctx->main_file);
90 0 : dasher_free_file(ctx->llhls_file);
91 :
92 : // Free the llhas template if it exists
93 0 : if (ctx->llhas_template)
94 0 : g_free(ctx->llhas_template);
95 :
96 : // Free the context
97 0 : g_free(ctx);
98 0 : }
99 :
100 : GF_Err
101 10 : dasher_configure_pid(GF_Filter* filter, GF_FilterPid* pid)
102 : {
103 : GPAC_MemOutPIDContext* ctx =
104 10 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
105 10 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
106 10 : GPAC_MemOutPIDFlags udta_flags = gf_filter_pid_get_udta_flags(pid);
107 :
108 : // We'll transfer the data over signals
109 10 : udta_flags |= GPAC_MEMOUT_PID_FLAG_DONT_CONSUME;
110 10 : gf_filter_pid_set_udta_flags(pid, udta_flags);
111 :
112 : const GF_PropertyValue* p =
113 10 : gf_filter_pid_get_property(pid, GF_PROP_PID_IS_MANIFEST);
114 10 : if (p && p->value.uint)
115 2 : dasher_ctx->is_manifest = TRUE;
116 : else
117 8 : dasher_ctx->dash_state = 1;
118 :
119 : // Get the destination path
120 : GF_PropertyValue dst;
121 10 : Bool found = gf_filter_get_arg(filter, "dst", &dst);
122 10 : if (found)
123 20 : dasher_ctx->dst = g_strdup(dst.value.string);
124 :
125 : GPAC_MemOutPrivateContext* fctx =
126 10 : (GPAC_MemOutPrivateContext*)gf_filter_pid_get_alias_udta(pid);
127 10 : if (fctx) {
128 8 : dasher_ctx->dst = fctx->dst;
129 : } else {
130 2 : fctx = (GPAC_MemOutPrivateContext*)gf_filter_get_udta(filter);
131 2 : dasher_ctx->dst = fctx->dst;
132 : }
133 :
134 10 : return GF_OK;
135 : }
136 :
137 : Bool
138 0 : dasher_process_event(GF_Filter* filter, const GF_FilterEvent* evt)
139 : {
140 0 : if (evt->base.type == GF_FEVT_FILE_DELETE) {
141 : GPAC_MemIoContext* io_ctx =
142 0 : (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
143 : GPAC_MemOutPIDContext* ctx =
144 0 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(evt->base.on_pid);
145 0 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
146 :
147 0 : GST_TRACE_OBJECT(io_ctx->sess->element,
148 : "Received file delete event for PID %s: %s",
149 : gf_filter_pid_get_name(evt->base.on_pid),
150 : evt->file_del.url);
151 :
152 0 : gboolean sent = gpac_signal_try_emit(io_ctx->sess->element,
153 : GPAC_SIGNAL_DASHER_DELETE_SEGMENT,
154 0 : evt->file_del.url,
155 : NULL);
156 :
157 0 : if (!sent) {
158 0 : GFile* file = g_file_new_for_path(evt->file_del.url);
159 0 : GError* error = NULL;
160 0 : if (!g_file_delete(file, NULL, &error)) {
161 0 : GST_ELEMENT_WARNING(io_ctx->sess->element,
162 : RESOURCE,
163 : FAILED,
164 : (NULL),
165 : ("Failed to delete file %s: %s",
166 : evt->file_del.url,
167 : error ? error->message : "Unknown error"));
168 0 : if (error)
169 0 : g_error_free(error);
170 : }
171 0 : g_object_unref(file);
172 : }
173 :
174 0 : return GF_TRUE;
175 : }
176 0 : return GF_FALSE;
177 : }
178 :
179 : void
180 482 : dasher_open_close_file(GF_Filter* filter,
181 : GF_FilterPid* pid,
182 : const gchar* name,
183 : gboolean is_llhls)
184 : {
185 482 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
186 : GPAC_MemOutPIDContext* ctx =
187 482 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
188 482 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
189 :
190 : // Get a pointer to the requested file
191 482 : FileAbstract** file = NULL;
192 482 : if (is_llhls) {
193 163 : file = &dasher_ctx->llhls_file;
194 : } else {
195 319 : file = &dasher_ctx->main_file;
196 : }
197 :
198 : // If the file is already open, close it
199 482 : if (*file) {
200 82 : GST_TRACE_OBJECT(io_ctx->sess->element,
201 : "Closing file for PID %s: %s",
202 : gf_filter_pid_get_name(pid),
203 : (*file)->name);
204 82 : if ((*file)->out)
205 82 : g_output_stream_close((*file)->out, NULL, NULL);
206 82 : if ((*file)->file) {
207 0 : g_object_unref((*file)->out);
208 0 : g_object_unref((*file)->file);
209 : }
210 :
211 82 : g_free((*file)->name);
212 82 : g_free(*file);
213 82 : *file = NULL;
214 : }
215 :
216 482 : if (!name)
217 400 : return; // No file to open
218 :
219 82 : GST_TRACE_OBJECT(io_ctx->sess->element,
220 : "Opening new file for PID %s: %s",
221 : gf_filter_pid_get_name(pid),
222 : name);
223 :
224 : // Create a new file
225 82 : *file = g_new0(FileAbstract, 1);
226 82 : (*file)->name = g_strdup(name);
227 :
228 : // Decide on the file flags
229 82 : gboolean has_os = FALSE;
230 82 : if (dasher_ctx->is_manifest) {
231 46 : if (g_strcmp0(name, dasher_ctx->dst) == 0) {
232 2 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
233 : GPAC_SIGNAL_DASHER_MANIFEST,
234 2 : (*file)->name,
235 2 : &(*file)->out);
236 : } else {
237 44 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
238 : GPAC_SIGNAL_DASHER_MANIFEST_VARIANT,
239 44 : (*file)->name,
240 44 : &(*file)->out);
241 : }
242 : } else {
243 36 : if (g_strcmp0(name, dasher_ctx->dst) == 0) {
244 4 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
245 : GPAC_SIGNAL_DASHER_SEGMENT_INIT,
246 4 : (*file)->name,
247 4 : &(*file)->out);
248 : } else {
249 32 : has_os = gpac_signal_try_emit(io_ctx->sess->element,
250 : GPAC_SIGNAL_DASHER_SEGMENT,
251 32 : (*file)->name,
252 32 : &(*file)->out);
253 : }
254 : }
255 :
256 82 : if (!has_os) {
257 : // Create a GFile and GOutputStream for the file
258 0 : (*file)->file = g_file_new_for_path((*file)->name);
259 :
260 0 : GError* error = NULL;
261 0 : (*file)->out = G_OUTPUT_STREAM(g_file_replace(
262 : (*file)->file, NULL, FALSE, G_FILE_CREATE_NONE, NULL, &error));
263 0 : if (!(*file)->out) {
264 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
265 : STREAM,
266 : FAILED,
267 : (NULL),
268 : ("Failed to open output stream for file %s: %s",
269 : (*file)->name,
270 : error ? error->message : "Unknown error"));
271 0 : g_error_free(error);
272 0 : g_object_unref((*file)->file);
273 :
274 : // Reset the current file pointer
275 0 : g_free((*file)->name);
276 0 : g_free(*file);
277 0 : *file = NULL;
278 0 : return;
279 : }
280 : }
281 : }
282 :
283 : void
284 6 : dasher_setup_file(GF_Filter* filter, GF_FilterPid* pid)
285 : {
286 : GPAC_MemOutPIDContext* ctx =
287 6 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
288 6 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
289 6 : const char* dst = dasher_ctx->dst;
290 :
291 : const GF_PropertyValue* p =
292 6 : gf_filter_pid_get_property(pid, GF_PROP_PID_OUTPATH);
293 6 : if (p && p->value.string) {
294 0 : dasher_open_close_file(filter, pid, p->value.string, FALSE);
295 0 : return;
296 : }
297 :
298 6 : if (dasher_ctx->dst) {
299 6 : dasher_open_close_file(filter, pid, dasher_ctx->dst, FALSE);
300 : } else {
301 0 : p = gf_filter_pid_get_property(pid, GF_PROP_PID_FILEPATH);
302 0 : if (!p)
303 0 : p = gf_filter_pid_get_property(pid, GF_PROP_PID_URL);
304 0 : if (p && p->value.string)
305 0 : dasher_open_close_file(filter, pid, p->value.string, FALSE);
306 : }
307 : }
308 :
309 : GF_Err
310 114 : dasher_ensure_file(GF_Filter* filter, GF_FilterPid* pid, gboolean is_llhls)
311 : {
312 114 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
313 : GPAC_MemOutPIDContext* ctx =
314 114 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
315 114 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
316 :
317 114 : FileAbstract** file = NULL;
318 114 : if (is_llhls) {
319 0 : file = &dasher_ctx->llhls_file;
320 : } else {
321 114 : file = &dasher_ctx->main_file;
322 : }
323 :
324 : // If we don't have a file, set it up
325 114 : if (G_UNLIKELY(!(*file))) {
326 0 : GST_ELEMENT_ERROR(
327 : io_ctx->sess->element,
328 : STREAM,
329 : FAILED,
330 : (NULL),
331 : ("Failed to ensure file for PID %s", gf_filter_pid_get_name(pid)));
332 0 : return GF_IO_ERR;
333 : }
334 :
335 114 : if (G_UNLIKELY(!(*file)->out)) {
336 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
337 : STREAM,
338 : FAILED,
339 : (NULL),
340 : ("No output stream for file %s",
341 : (*file)->name ? (*file)->name : "unknown"));
342 0 : gf_filter_abort(filter);
343 0 : return GF_IO_ERR;
344 : }
345 :
346 114 : return GF_OK;
347 : }
348 :
349 : GF_Err
350 114 : dasher_write_data(GF_Filter* filter,
351 : GF_FilterPid* pid,
352 : FileAbstract* file,
353 : const u8* data,
354 : u32 size)
355 : {
356 114 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
357 : GPAC_MemOutPIDContext* ctx =
358 114 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
359 114 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
360 :
361 114 : if (!file || !file->out) {
362 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
363 : STREAM,
364 : FAILED,
365 : (NULL),
366 : ("No output stream for file %s",
367 : file && file->name ? file->name : "unknown"));
368 0 : return GF_IO_ERR;
369 : }
370 :
371 : gssize bytes_written =
372 114 : g_output_stream_write(file->out, data, size, NULL, NULL);
373 114 : if (bytes_written < 0) {
374 0 : GST_ELEMENT_ERROR(io_ctx->sess->element,
375 : STREAM,
376 : FAILED,
377 : (NULL),
378 : ("Failed to write data to output stream for file %s",
379 : file->name ? file->name : "unknown"));
380 0 : return GF_IO_ERR;
381 : }
382 :
383 114 : if (bytes_written < (gssize)size) {
384 0 : GST_ELEMENT_WARNING(io_ctx->sess->element,
385 : STREAM,
386 : FAILED,
387 : (NULL),
388 : ("Partial write to output stream for file %s, "
389 : "expected: %" G_GSIZE_FORMAT
390 : ", written: %" G_GSSIZE_FORMAT,
391 : file->name ? file->name : "unknown",
392 : (gssize)size,
393 : bytes_written));
394 : }
395 :
396 114 : GST_TRACE_OBJECT(io_ctx->sess->element,
397 : "Wrote %s, size: %" G_GSIZE_FORMAT,
398 : file->name ? file->name : "unknown",
399 : (gssize)size);
400 :
401 114 : return GF_OK;
402 : }
403 :
404 : GF_Err
405 640 : dasher_post_process(GF_Filter* filter, GF_FilterPid* pid, GF_FilterPacket* pck)
406 : {
407 640 : GPAC_MemIoContext* io_ctx = (GPAC_MemIoContext*)gf_filter_get_rt_udta(filter);
408 : GPAC_MemOutPIDContext* ctx =
409 640 : (GPAC_MemOutPIDContext*)gf_filter_pid_get_udta(pid);
410 640 : DasherCtx* dasher_ctx = (DasherCtx*)ctx->private_ctx;
411 : const GF_PropertyValue* fname;
412 : const GF_PropertyValue* p;
413 :
414 640 : if (!pck) {
415 526 : if (gf_filter_pid_is_eos(pid) && !gf_filter_pid_is_flush_eos(pid)) {
416 163 : dasher_open_close_file(filter, pid, NULL, FALSE);
417 163 : dasher_open_close_file(filter, pid, NULL, TRUE);
418 : }
419 526 : return GF_OK; // No packet to process
420 : }
421 :
422 : // Check the packet framing
423 : Bool start;
424 : Bool end;
425 114 : gf_filter_pck_get_framing(pck, &start, &end);
426 114 : if (dasher_ctx->dash_state) {
427 68 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENUM);
428 68 : if (p) {
429 32 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENAME);
430 32 : if (p && p->value.string)
431 32 : start = GF_TRUE;
432 : }
433 :
434 68 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_EODS);
435 68 : if (p && p->value.boolean)
436 0 : end = GF_TRUE;
437 : }
438 :
439 114 : if (start) {
440 : // Previous file has ended, move to the next file
441 82 : if (dasher_ctx->main_file)
442 28 : dasher_open_close_file(filter, pid, NULL, FALSE);
443 :
444 : const GF_PropertyValue* ext;
445 : const GF_PropertyValue* fnum;
446 : const GF_PropertyValue* rel;
447 82 : Bool explicit_overwrite = GF_FALSE;
448 82 : const char* name = NULL;
449 82 : fname = ext = NULL;
450 : // file num increased per packet, open new file
451 82 : fnum = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENUM);
452 82 : if (fnum) {
453 32 : fname = gf_filter_pid_get_property(pid, GF_PROP_PID_OUTPATH);
454 32 : ext = gf_filter_pid_get_property(pid, GF_PROP_PID_FILE_EXT);
455 32 : if (!fname)
456 32 : name = dasher_ctx->dst;
457 : }
458 : // filename change at packet start, open new file
459 82 : if (!fname)
460 82 : fname = gf_filter_pck_get_property(pck, GF_PROP_PCK_FILENAME);
461 82 : if (fname)
462 76 : name = fname->value.string;
463 :
464 82 : if (name) {
465 76 : dasher_open_close_file(filter, pid, name, FALSE);
466 6 : } else if (!dasher_ctx->main_file) {
467 6 : dasher_setup_file(filter, pid);
468 : }
469 :
470 82 : fname = gf_filter_pck_get_property(pck, GF_PROP_PCK_LLHAS_TEMPLATE);
471 82 : if (fname) {
472 0 : if (dasher_ctx->llhas_template)
473 0 : g_free(dasher_ctx->llhas_template);
474 0 : dasher_ctx->llhas_template = g_strdup(fname->value.string);
475 : }
476 : }
477 :
478 : // Get the data
479 : u32 size;
480 114 : const u8* data = gf_filter_pck_get_data(pck, &size);
481 114 : if (G_UNLIKELY(!data || !size)) {
482 0 : GST_ELEMENT_WARNING(
483 : io_ctx->sess->element,
484 : STREAM,
485 : FAILED,
486 : (NULL),
487 : ("Received empty packet for PID %s\n", gf_filter_pid_get_name(pid)));
488 0 : return GF_OK;
489 : }
490 :
491 : // We must be actively working on a file by now
492 114 : gpac_return_if_fail(dasher_ensure_file(filter, pid, FALSE));
493 :
494 : // If we are in low-latency HLS mode, we need to handle the llhas chunks
495 114 : p = gf_filter_pck_get_property(pck, GF_PROP_PCK_LLHAS_FRAG_NUM);
496 114 : if (p) {
497 0 : char* llhas_chunkname = gf_mpd_resolve_subnumber(
498 0 : dasher_ctx->llhas_template, dasher_ctx->main_file->name, p->value.uint);
499 0 : dasher_open_close_file(
500 : filter, pid, llhas_chunkname, TRUE); // Open the llhls file
501 0 : gf_free(llhas_chunkname);
502 :
503 : // Ensure the file is set up for llhls
504 0 : gpac_return_if_fail(dasher_ensure_file(filter, pid, TRUE));
505 : }
506 :
507 : // Write the data to the output stream
508 114 : gpac_return_if_fail(
509 : dasher_write_data(filter, pid, dasher_ctx->main_file, data, size));
510 114 : if (dasher_ctx->llhls_file) {
511 : // Write to the llhls file if it exists
512 0 : gpac_return_if_fail(
513 : dasher_write_data(filter, pid, dasher_ctx->llhls_file, data, size));
514 : }
515 :
516 : // Close the output stream
517 114 : if (end && dasher_ctx->is_manifest)
518 46 : dasher_open_close_file(filter, pid, NULL, FALSE);
519 :
520 114 : return GF_OK;
521 : }
522 :
523 : GPAC_FilterPPRet
524 8045 : dasher_consume(GF_Filter* filter, GF_FilterPid* pid, void** outptr)
525 : {
526 : // We don't output any buffers directly
527 8045 : return GPAC_FILTER_PP_RET_NULL;
528 : }
|