Line data Source code
1 : /*
2 : * GPAC - Multimedia Framework C SDK
3 : *
4 : * Authors: Jean Le Feuvre
5 : * Copyright (c) Telecom ParisTech 2018-2021
6 : * All rights reserved
7 : *
8 : * This file is part of GPAC / ROUTE (ATSC3, DVB-I) input filter
9 : *
10 : * GPAC is free software; you can redistribute it and/or modify
11 : * it under the terms of the GNU Lesser General Public License as published by
12 : * the Free Software Foundation; either version 2, or (at your option)
13 : * any later version.
14 : *
15 : * GPAC is distributed in the hope that it will be useful,
16 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 : * GNU Lesser General Public License for more details.
19 : *
20 : * You should have received a copy of the GNU Lesser General Public
21 : * License along with this library; see the file COPYING. If not, write to
22 : * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 : *
24 : */
25 :
26 : #include <gpac/filters.h>
27 : #include <gpac/route.h>
28 : #include <gpac/network.h>
29 : #include <gpac/thread.h>
30 :
31 : #ifndef GPAC_DISABLE_ROUTE
32 :
33 : typedef struct
34 : {
35 : u32 sid;
36 : u32 tsi;
37 : GF_FilterPid *opid;
38 : } TSI_Output;
39 :
40 : typedef struct
41 : {
42 : GF_FilterPid *opid;
43 : char *seg_name;
44 : } SegInfo;
45 :
46 : enum
47 : {
48 : ROUTEIN_REPAIR_NO = 0,
49 : ROUTEIN_REPAIR_SIMPLE,
50 : ROUTEIN_REPAIR_STRICT,
51 : ROUTEIN_REPAIR_FULL,
52 : };
53 :
54 : typedef struct
55 : {
56 : //options
57 : char *src, *ifce, *odir;
58 : Bool gcache, kc, skipr, reorder, fullseg;
59 : u32 buffer, timeout, stats, max_segs, tsidbg, rtimeout, nbcached, repair;
60 : s32 tunein, stsi;
61 :
62 : //internal
63 : GF_Filter *filter;
64 : GF_DownloadManager *dm;
65 :
66 : char *clock_init_seg;
67 : GF_ROUTEDmx *route_dmx;
68 : u32 tune_service_id;
69 :
70 : u32 sync_tsi, last_toi;
71 :
72 : u32 start_time, tune_time, last_timeout;
73 : GF_FilterPid *opid;
74 : GF_List *tsi_outs;
75 :
76 : u32 nb_stats;
77 : GF_List *received_seg_names;
78 :
79 : u32 nb_playing;
80 : Bool initial_play_forced;
81 : } ROUTEInCtx;
82 :
83 :
84 0 : static void routein_repair_segment_ts(ROUTEInCtx *ctx, GF_ROUTEEventFileInfo *finfo)
85 : {
86 : u32 i, pos;
87 0 : u8 *data = finfo->blob->data;
88 :
89 : pos = 0;
90 0 : for (i=0; i<finfo->nb_frags; i++) {
91 0 : u32 start_range = finfo->frags[i].offset;
92 0 : u32 end_range = finfo->frags[i].size;
93 :
94 0 : end_range += start_range;
95 : //reset all missed byte ranges as padding packets
96 0 : start_range -= pos;
97 0 : while (start_range % 188) start_range++;
98 0 : while (pos<start_range) {
99 0 : data[pos] = 0x47;
100 0 : data[pos+1] = 0x1F;
101 0 : data[pos+2] = 0xFF;
102 0 : data[pos+3] = 0x10;
103 0 : pos += 188;
104 : }
105 : //end range not aligned with a packet start, rewind position to prev packet start
106 0 : if (end_range % 188) {
107 0 : while (end_range % 188) end_range--;
108 : pos = end_range;
109 : }
110 : }
111 : //and patch all end packets
112 0 : while (pos<finfo->blob->size) {
113 0 : data[pos] = 0x47;
114 0 : data[pos+1] = 0x1F;
115 0 : data[pos+2] = 0xFF;
116 0 : data[pos+3] = 0x10;
117 0 : pos += 188;
118 : }
119 : //remove corrupted flag
120 0 : finfo->blob->flags = 0;
121 0 : }
122 :
123 : //top boxes we look for in segments
124 : static const char *top_codes[] = {"styp", "emsg", "prft", "moof", "mdat", "free", "sidx", "ssix"};
125 : static u32 nb_top_codes = GF_ARRAY_LENGTH(top_codes);
126 :
127 :
128 820 : u32 next_top_level_box(GF_ROUTEEventFileInfo *finfo, u8 *data, u32 size, u32 *cur_pos, u32 *box_size)
129 : {
130 820 : u32 pos = *cur_pos;
131 : u32 cur_frag = 0;
132 1640 : while (cur_frag < finfo->nb_frags) {
133 : //in range, can go
134 820 : if ((finfo->frags[cur_frag].offset <= pos) && (finfo->frags[cur_frag].offset + finfo->frags[cur_frag].size > pos)) {
135 : break;
136 : }
137 : //before range, adjust pos
138 7 : if (finfo->frags[cur_frag].offset > pos) {
139 : pos = finfo->frags[cur_frag].offset;
140 : break;
141 : }
142 : //after range, go to next
143 0 : cur_frag++;
144 : //current pos is outside last valid range, no more top-level boxes to parse
145 0 : if (cur_frag==finfo->nb_frags)
146 : return 0;
147 : }
148 :
149 166261 : while (pos < size) {
150 : u32 i;
151 : u32 type_idx = 0;
152 : u32 first_box = 0;
153 : u32 first_box_size = 0;
154 1325967 : for (i=0; i<nb_top_codes; i++) {
155 1326782 : if ((data[pos]==top_codes[i][0]) && (data[pos+1]==top_codes[i][1]) && (data[pos+2]==top_codes[i][2]) && (data[pos+3]==top_codes[i][3])) {
156 : first_box = pos;
157 : type_idx = i;
158 : break;
159 : }
160 : }
161 : //we need at least 4 bytes size header
162 166256 : if (first_box<4) {
163 165441 : pos++;
164 165441 : continue;
165 : }
166 815 : first_box_size = GF_4CC(data[first_box-4], data[first_box-3], data[first_box-2], data[first_box-1]);
167 815 : if (first_box_size<8) {
168 0 : pos++;
169 0 : continue;
170 : }
171 815 : *cur_pos = first_box-4;
172 815 : *box_size = first_box_size;
173 815 : return GF_4CC(top_codes[type_idx][0], top_codes[type_idx][1], top_codes[type_idx][2], top_codes[type_idx][3]);
174 : }
175 : return 0;
176 : }
177 :
178 9 : static void routein_repair_segment_isobmf(ROUTEInCtx *ctx, GF_ROUTEEventFileInfo *finfo)
179 : {
180 9 : u8 *data = finfo->blob->data;
181 9 : u32 size = finfo->blob->size;
182 9 : u32 pos = 0;
183 : u32 prev_moof_pos = 0;
184 : //walk through all possible top-level boxes in order
185 : //if box completely in a received byte range, keep as is
186 : //if mdat or free box, keep as is
187 : //otherwise change box type to free
188 824 : while (pos < size) {
189 : u32 i;
190 : Bool is_mdat = GF_FALSE;
191 : Bool box_complete = GF_FALSE;
192 : u32 prev_pos = pos;
193 820 : u32 box_size = 0;
194 820 : u32 type = next_top_level_box(finfo, data, size, &pos, &box_size);
195 : //no more top-level found, patch from current pos until end of payload
196 820 : if (!type) {
197 5 : u32 remain = size - pos;
198 : assert(remain);
199 5 : if (remain<8) {
200 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] Failed to patch end of corrupted segment, segment size not big enough to hold the final box header, something really corrupted in source data\n"));
201 5 : return;
202 : }
203 5 : data[pos] = (remain>>24) & 0xFF;
204 5 : data[pos+1] = (remain>>16) & 0xFF;
205 5 : data[pos+2] = (remain>>8) & 0xFF;
206 5 : data[pos+3] = (remain) & 0xFF;
207 5 : data[pos+4] = 'f';
208 5 : data[pos+5] = 'r';
209 5 : data[pos+6] = 'e';
210 5 : data[pos+7] = 'e';
211 : //remove corrupted flag
212 5 : finfo->blob->flags = 0;
213 : return;
214 : }
215 : //we missed a box header, insert one at previous pos, indicating a free box !!
216 815 : if (pos > prev_pos) {
217 2 : u32 missed_size = pos - prev_pos;
218 2 : data[prev_pos] = (missed_size>>24) & 0xFF;
219 2 : data[prev_pos+1] = (missed_size>>16) & 0xFF;
220 2 : data[prev_pos+2] = (missed_size>>8) & 0xFF;
221 2 : data[prev_pos+3] = (missed_size) & 0xFF;
222 2 : data[prev_pos+4] = 'f';
223 2 : data[prev_pos+5] = 'r';
224 2 : data[prev_pos+6] = 'e';
225 2 : data[prev_pos+7] = 'e';
226 : }
227 815 : if (type == GF_4CC('f','r','e','e')) {
228 : box_complete = GF_TRUE;
229 815 : } else if (type == GF_4CC('m','d','a','t')) {
230 271 : if (ctx->repair != ROUTEIN_REPAIR_STRICT) {
231 : box_complete = GF_TRUE;
232 : } else {
233 : is_mdat = GF_TRUE;
234 : }
235 544 : } else if (type == GF_4CC('m','o','o','f')) {
236 271 : prev_moof_pos = pos;
237 : }
238 :
239 815 : if (!box_complete) {
240 : //box is only partially received
241 0 : for (i=0; i<finfo->nb_frags; i++) {
242 544 : if (pos + box_size < finfo->frags[i].offset)
243 : break;
244 544 : if ((pos >= finfo->frags[i].offset) && (pos+box_size<=finfo->frags[i].offset + finfo->frags[i].size)) {
245 : box_complete = GF_TRUE;
246 : break;
247 : }
248 : }
249 : }
250 815 : if (box_complete) {
251 815 : pos += box_size;
252 815 : continue;
253 : }
254 : //incomplete mdat (strict mode), discard previous moof
255 0 : if (is_mdat) {
256 0 : data[prev_moof_pos+4] = 'f';
257 0 : data[prev_moof_pos+5] = 'r';
258 0 : data[prev_moof_pos+6] = 'e';
259 0 : data[prev_moof_pos+7] = 'e';
260 : }
261 : //incomplete box, move to free (not changing size)
262 0 : data[pos+4] = 'f';
263 0 : data[pos+5] = 'r';
264 0 : data[pos+6] = 'e';
265 0 : data[pos+7] = 'e';
266 0 : pos += box_size;
267 : }
268 : //remove corrupted flag
269 4 : finfo->blob->flags = 0;
270 : }
271 :
272 9 : static void routein_repair_segment(ROUTEInCtx *ctx, GF_ROUTEEventFileInfo *finfo)
273 : {
274 9 : if (ctx->repair==ROUTEIN_REPAIR_NO)
275 : return;
276 :
277 9 : if (finfo->blob->mx)
278 9 : gf_mx_p(finfo->blob->mx);
279 :
280 9 : if (strstr(finfo->filename, ".ts") || strstr(finfo->filename, ".m2ts")) {
281 0 : routein_repair_segment_ts(ctx, finfo);
282 : } else {
283 9 : routein_repair_segment_isobmf(ctx, finfo);
284 : }
285 :
286 9 : if (finfo->blob->mx)
287 9 : gf_mx_v(finfo->blob->mx);
288 : }
289 :
290 :
291 2939 : static GF_FilterProbeScore routein_probe_url(const char *url, const char *mime)
292 : {
293 2939 : if (!strnicmp(url, "atsc://", 7)) return GF_FPROBE_SUPPORTED;
294 2937 : if (!strnicmp(url, "route://", 8)) return GF_FPROBE_SUPPORTED;
295 2933 : return GF_FPROBE_NOT_SUPPORTED;
296 : }
297 :
298 :
299 6 : static void routein_finalize(GF_Filter *filter)
300 : {
301 6 : ROUTEInCtx *ctx = gf_filter_get_udta(filter);
302 :
303 : #ifdef GPAC_ENABLE_COVERAGE
304 6 : if (gf_sys_is_cov_mode())
305 6 : gf_route_dmx_purge_objects(ctx->route_dmx, 1);
306 : #endif
307 :
308 6 : if (ctx->clock_init_seg) gf_free(ctx->clock_init_seg);
309 6 : if (ctx->route_dmx) gf_route_dmx_del(ctx->route_dmx);
310 :
311 6 : if (ctx->tsi_outs) {
312 0 : while (gf_list_count(ctx->tsi_outs)) {
313 0 : TSI_Output *tsio = gf_list_pop_back(ctx->tsi_outs);
314 0 : gf_free(tsio);
315 : }
316 0 : gf_list_del(ctx->tsi_outs);
317 : }
318 6 : if (ctx->received_seg_names) {
319 5 : while (gf_list_count(ctx->received_seg_names)) {
320 4 : if (ctx->odir) {
321 4 : char *filedel = gf_list_pop_back(ctx->received_seg_names);
322 4 : if (filedel) gf_free(filedel);
323 : } else {
324 0 : SegInfo *si = gf_list_pop_back(ctx->received_seg_names);
325 0 : gf_free(si->seg_name);
326 0 : gf_free(si);
327 : }
328 : }
329 1 : gf_list_del(ctx->received_seg_names);
330 : }
331 6 : }
332 :
333 0 : static void push_seg_info(ROUTEInCtx *ctx, GF_FilterPid *pid, GF_ROUTEEventFileInfo *finfo)
334 : {
335 0 : if (ctx->received_seg_names) {
336 : SegInfo *si;
337 0 : GF_SAFEALLOC(si, SegInfo);
338 0 : if (!si) return;
339 0 : si->opid = pid;
340 0 : si->seg_name = gf_strdup(finfo->filename);
341 0 : gf_list_add(ctx->received_seg_names, si);
342 : }
343 0 : while (gf_list_count(ctx->received_seg_names) > ctx->max_segs) {
344 : GF_FilterEvent evt;
345 0 : SegInfo *si = gf_list_pop_front(ctx->received_seg_names);
346 0 : GF_FEVT_INIT(evt, GF_FEVT_FILE_DELETE, si->opid);
347 0 : evt.file_del.url = si->seg_name;
348 0 : gf_filter_pid_send_event(si->opid, &evt);
349 0 : gf_free(si->seg_name);
350 0 : gf_free(si);
351 : }
352 : }
353 :
354 0 : static void routein_send_file(ROUTEInCtx *ctx, u32 service_id, GF_ROUTEEventFileInfo *finfo, u32 evt_type)
355 : {
356 0 : if (!ctx->kc || !(finfo->blob->flags & GF_BLOB_CORRUPTED)) {
357 : u8 *output;
358 : char *ext;
359 : GF_FilterPid *pid, **p_pid;
360 : GF_FilterPacket *pck;
361 : TSI_Output *tsio = NULL;
362 :
363 0 : p_pid = &ctx->opid;
364 0 : if (finfo->tsi && ctx->stsi) {
365 0 : u32 i, count = gf_list_count(ctx->tsi_outs);
366 0 : for (i=0; i<count; i++) {
367 0 : tsio = gf_list_get(ctx->tsi_outs, i);
368 0 : if ((tsio->sid==service_id) && (tsio->tsi==finfo->tsi)) {
369 : break;
370 : }
371 : tsio=NULL;
372 : }
373 0 : if (!tsio) {
374 0 : GF_SAFEALLOC(tsio, TSI_Output);
375 0 : if (!tsio) return;
376 :
377 0 : tsio->tsi = finfo->tsi;
378 0 : tsio->sid = service_id;
379 0 : gf_list_add(ctx->tsi_outs, tsio);
380 : }
381 0 : p_pid = &tsio->opid;
382 :
383 0 : if ((evt_type==GF_ROUTE_EVT_FILE) || (evt_type==GF_ROUTE_EVT_MPD)) {
384 0 : if (ctx->skipr && !finfo->updated) return;
385 : }
386 : }
387 0 : pid = *p_pid;
388 :
389 0 : if (!pid) {
390 0 : pid = gf_filter_pid_new(ctx->filter);
391 0 : (*p_pid) = pid;
392 0 : gf_filter_pid_set_property(pid, GF_PROP_PID_STREAM_TYPE, &PROP_UINT(GF_STREAM_FILE));
393 : }
394 0 : gf_filter_pid_set_property(pid, GF_PROP_PID_ID, &PROP_UINT(tsio ? tsio->tsi : service_id));
395 0 : gf_filter_pid_set_property(pid, GF_PROP_PID_SERVICE_ID, &PROP_UINT(service_id));
396 0 : gf_filter_pid_set_property(pid, GF_PROP_PID_URL, &PROP_STRING(finfo->filename));
397 0 : ext = gf_file_ext_start(finfo->filename);
398 0 : gf_filter_pid_set_property(pid, GF_PROP_PID_FILE_EXT, &PROP_STRING(ext ? (ext+1) : "*" ));
399 :
400 0 : pck = gf_filter_pck_new_alloc(pid, finfo->blob->size, &output);
401 0 : if (pck) {
402 0 : memcpy(output, finfo->blob->data, finfo->blob->size);
403 0 : if (finfo->blob->flags & GF_BLOB_CORRUPTED) gf_filter_pck_set_corrupted(pck, GF_TRUE);
404 0 : gf_filter_pck_send(pck);
405 : }
406 :
407 0 : if (ctx->max_segs && (evt_type==GF_ROUTE_EVT_DYN_SEG))
408 0 : push_seg_info(ctx, pid, finfo);
409 : }
410 :
411 0 : while (gf_route_dmx_get_object_count(ctx->route_dmx, service_id)>1) {
412 0 : if (! gf_route_dmx_remove_first_object(ctx->route_dmx, service_id))
413 : break;
414 : }
415 : }
416 :
417 9 : static void routein_write_to_disk(ROUTEInCtx *ctx, u32 service_id, GF_ROUTEEventFileInfo *finfo, u32 evt_type)
418 : {
419 : char szPath[GF_MAX_PATH];
420 : FILE *out;
421 9 : if (!finfo->blob)
422 0 : return;
423 :
424 9 : if ((finfo->blob->flags & GF_BLOB_CORRUPTED) && !ctx->kc)
425 : return;
426 :
427 9 : sprintf(szPath, "%s/service%d/%s", ctx->odir, service_id, finfo->filename);
428 :
429 9 : out = gf_fopen(szPath, "wb");
430 9 : if (!out) {
431 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] Service %d failed to create MPD file %s\n", service_id, szPath ));
432 : } else {
433 9 : u32 bytes = (u32) gf_fwrite(finfo->blob->data, finfo->blob->size, out);
434 9 : gf_fclose(out);
435 9 : if (bytes != finfo->blob->size) {
436 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] Service %d failed to write file %s: %d written for %d total\n", service_id, finfo->filename, bytes, finfo->blob->size));
437 : }
438 : }
439 17 : while (gf_route_dmx_get_object_count(ctx->route_dmx, service_id)>1) {
440 11 : if (! gf_route_dmx_remove_first_object(ctx->route_dmx, service_id))
441 : break;
442 : }
443 :
444 9 : if (ctx->max_segs && (evt_type==GF_ROUTE_EVT_DYN_SEG)) {
445 5 : gf_list_add(ctx->received_seg_names, gf_strdup(szPath));
446 :
447 6 : while (gf_list_count(ctx->received_seg_names) > ctx->max_segs) {
448 1 : char *filedel = gf_list_pop_front(ctx->received_seg_names);
449 1 : if (filedel) {
450 1 : gf_file_delete(filedel);
451 1 : gf_free(filedel);
452 : }
453 : }
454 : }
455 : }
456 :
457 :
458 :
459 560 : void routein_on_event(void *udta, GF_ROUTEEventType evt, u32 evt_param, GF_ROUTEEventFileInfo *finfo)
460 : {
461 : char szPath[GF_MAX_PATH];
462 : ROUTEInCtx *ctx = (ROUTEInCtx *)udta;
463 : u32 nb_obj;
464 : Bool is_init = GF_TRUE;
465 : Bool is_loop = GF_FALSE;
466 : DownloadedCacheEntry cache_entry;
467 :
468 : //events without finfo
469 560 : if (evt==GF_ROUTE_EVT_SERVICE_FOUND) {
470 6 : if (!ctx->tune_time) ctx->tune_time = gf_sys_clock();
471 24 : return;
472 : }
473 554 : if (evt==GF_ROUTE_EVT_SERVICE_SCAN) {
474 2 : if (ctx->tune_service_id && !gf_route_dmx_find_atsc3_service(ctx->route_dmx, ctx->tune_service_id)) {
475 :
476 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] Asked to tune to service %d but no such service, tuning to first one\n", ctx->tune_service_id));
477 :
478 0 : ctx->tune_service_id = 0;
479 0 : gf_route_atsc3_tune_in(ctx->route_dmx, (u32) -2, GF_TRUE);
480 : }
481 : return;
482 : }
483 552 : if (!finfo)
484 : return;
485 :
486 : //events without finfo->blob
487 552 : if (evt==GF_ROUTE_EVT_FILE_DELETE) {
488 16 : if (ctx->gcache) {
489 11 : sprintf(szPath, "http://groute/service%d/%s", evt_param, finfo->filename);
490 11 : gf_dm_add_cache_entry(ctx->dm, szPath, NULL, 0, 0, "video/mp4", GF_FALSE, 0);
491 : }
492 : return;
493 : }
494 :
495 536 : if (!finfo->blob)
496 : return;
497 :
498 536 : cache_entry = finfo->udta;
499 536 : szPath[0] = 0;
500 536 : switch (evt) {
501 11 : case GF_ROUTE_EVT_MPD:
502 11 : if (!ctx->tune_time) ctx->tune_time = gf_sys_clock();
503 :
504 11 : if (ctx->odir) {
505 3 : routein_write_to_disk(ctx, evt_param, finfo, evt);
506 3 : break;
507 : }
508 8 : if (!ctx->gcache) {
509 0 : routein_send_file(ctx, evt_param, finfo, evt);
510 0 : break;
511 : }
512 :
513 8 : if (!ctx->opid) {
514 5 : ctx->opid = gf_filter_pid_new(ctx->filter);
515 5 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_STREAM_TYPE, &PROP_UINT(GF_STREAM_FILE));
516 : }
517 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_ID, &PROP_UINT(evt_param));
518 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_SERVICE_ID, &PROP_UINT(evt_param));
519 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_FILE_EXT, &PROP_STRING("mpd"));
520 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_MIME, &PROP_STRING("application/dash+xml"));
521 :
522 8 : sprintf(szPath, "http://groute/service%d/%s", evt_param, finfo->filename);
523 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_REDIRECT_URL, &PROP_STRING(szPath));
524 8 : gf_filter_pid_set_property(ctx->opid, GF_PROP_PID_URL, &PROP_STRING(szPath));
525 :
526 8 : cache_entry = gf_dm_add_cache_entry(ctx->dm, szPath, finfo->blob, 0, 0, "application/dash+xml", GF_TRUE, 0);
527 :
528 : sprintf(szPath, "x-route: %d\r\n", evt_param);
529 8 : gf_dm_force_headers(ctx->dm, cache_entry, szPath);
530 8 : gf_route_dmx_set_service_udta(ctx->route_dmx, evt_param, cache_entry);
531 :
532 8 : ctx->sync_tsi = 0;
533 8 : ctx->last_toi = 0;
534 8 : if (ctx->clock_init_seg) gf_free(ctx->clock_init_seg);
535 8 : ctx->clock_init_seg = NULL;
536 8 : ctx->tune_service_id = evt_param;
537 8 : break;
538 17 : case GF_ROUTE_EVT_DYN_SEG:
539 :
540 : //corrupted file, try to repair
541 17 : if (finfo->blob->flags & GF_BLOB_CORRUPTED) {
542 9 : routein_repair_segment(ctx, finfo);
543 : }
544 :
545 17 : if (ctx->odir) {
546 5 : routein_write_to_disk(ctx, evt_param, finfo, evt);
547 5 : break;
548 : }
549 12 : if (!ctx->gcache) {
550 0 : routein_send_file(ctx, evt_param, finfo, evt);
551 0 : break;
552 : }
553 : //fallthrough
554 :
555 : case GF_ROUTE_EVT_DYN_SEG_FRAG:
556 : //for now we only push complete files
557 509 : if (!ctx->gcache) {
558 : break;
559 : }
560 :
561 : #if 0
562 : //couldn't repair or this is a fragment
563 : if ((finfo->blob->flags & GF_BLOB_CORRUPTED) && !ctx->kc) {
564 :
565 : //force updating the cache entry since we may have reallocated the data buffer
566 : sprintf(szPath, "http://groute/service%d/%s", evt_param, finfo->filename);
567 : if (evt==GF_ROUTE_EVT_DYN_SEG_FRAG) {
568 : cache_entry = gf_dm_add_cache_entry(ctx->dm, szPath, finfo->blob, 0, 0, "video/mp4", GF_FALSE, finfo->download_ms);
569 : } else {
570 : if (ctx->fullseg)
571 : break;
572 : cache_entry = gf_dm_add_cache_entry(ctx->dm, szPath, finfo->blob, 0, 0, "video/mp4", GF_FALSE, finfo->download_ms);
573 : }
574 : //don't break yet, we want to signal the clock
575 : }
576 : #endif
577 :
578 509 : if (!ctx->clock_init_seg) {
579 8 : DownloadedCacheEntry mpd_cache_entry = gf_route_dmx_get_service_udta(ctx->route_dmx, evt_param);
580 8 : if (mpd_cache_entry) {
581 8 : ctx->clock_init_seg = gf_strdup(finfo->filename);
582 : sprintf(szPath, "x-route: %d\r\nx-route-first-seg: %s\r\n", evt_param, ctx->clock_init_seg);
583 8 : if (evt==GF_ROUTE_EVT_DYN_SEG_FRAG)
584 : strcat(szPath, "x-route-ll: yes\r\n");
585 8 : gf_dm_force_headers(ctx->dm, mpd_cache_entry, szPath);
586 8 : szPath[0] = 0;
587 : }
588 : }
589 :
590 509 : if ((finfo->blob->flags & GF_BLOB_CORRUPTED) && !ctx->kc)
591 : break;
592 :
593 : is_init = GF_FALSE;
594 509 : if (!ctx->sync_tsi) {
595 8 : ctx->sync_tsi = finfo->tsi;
596 8 : ctx->last_toi = finfo->toi;
597 501 : } else if (ctx->sync_tsi == finfo->tsi) {
598 478 : if (ctx->last_toi > finfo->toi) {
599 0 : GF_LOG(GF_LOG_WARNING, GF_LOG_ROUTE, ("[ROUTE] Loop detected on service %d for TSI %u: prev TOI %u this toi %u\n", ctx->tune_service_id, finfo->tsi, ctx->last_toi, finfo->toi));
600 :
601 0 : gf_route_dmx_purge_objects(ctx->route_dmx, evt_param);
602 : is_loop = GF_TRUE;
603 0 : if (cache_entry) {
604 0 : if (ctx->clock_init_seg) gf_free(ctx->clock_init_seg);
605 0 : ctx->clock_init_seg = gf_strdup(finfo->filename);
606 : sprintf(szPath, "x-route: %d\r\nx-route-first-seg: %s\r\nx-route-loop: yes\r\n", evt_param, ctx->clock_init_seg);
607 0 : gf_dm_force_headers(ctx->dm, cache_entry, szPath);
608 0 : szPath[0] = 0;
609 : }
610 : }
611 478 : ctx->last_toi = finfo->toi;
612 : }
613 : //fallthrough
614 :
615 : case GF_ROUTE_EVT_FILE:
616 :
617 520 : if (ctx->odir) {
618 1 : routein_write_to_disk(ctx, evt_param, finfo, evt);
619 1 : break;
620 : }
621 519 : if (!ctx->gcache) {
622 0 : routein_send_file(ctx, evt_param, finfo, evt);
623 0 : break;
624 : }
625 :
626 519 : if ((finfo->blob->flags & GF_BLOB_CORRUPTED) && !ctx->kc) return;
627 :
628 :
629 519 : if (!cache_entry) {
630 24 : sprintf(szPath, "http://groute/service%d/%s", evt_param, finfo->filename);
631 :
632 : //we copy over the init segment, but only share the data pointer for segments
633 24 : cache_entry = gf_dm_add_cache_entry(ctx->dm, szPath, finfo->blob, 0, 0, "video/mp4", is_init ? GF_TRUE : GF_FALSE, finfo->download_ms);
634 24 : if (cache_entry) {
635 24 : gf_dm_force_headers(ctx->dm, cache_entry, "x-route: yes\r\n");
636 24 : finfo->udta = cache_entry;
637 : }
638 : }
639 :
640 519 : if (evt==GF_ROUTE_EVT_DYN_SEG_FRAG) {
641 497 : GF_LOG(GF_LOG_INFO, GF_LOG_ROUTE, ("[ROUTE] Pushing fragment from file %s to cache\n", finfo->filename));
642 : break;
643 : }
644 :
645 :
646 22 : GF_LOG(GF_LOG_INFO, GF_LOG_ROUTE, ("[ROUTE] Pushing file %s to cache\n", finfo->filename));
647 22 : if (ctx->max_segs && (evt==GF_ROUTE_EVT_DYN_SEG))
648 0 : push_seg_info(ctx, ctx->opid, finfo);
649 :
650 22 : if (is_loop) break;
651 :
652 22 : nb_obj = gf_route_dmx_get_object_count(ctx->route_dmx, evt_param);
653 45 : while (nb_obj > ctx->nbcached) {
654 1 : if (!gf_route_dmx_remove_first_object(ctx->route_dmx, evt_param))
655 : break;
656 1 : nb_obj = gf_route_dmx_get_object_count(ctx->route_dmx, evt_param);
657 : }
658 : break;
659 : default:
660 : break;
661 : }
662 0 : }
663 :
664 59 : static Bool routein_local_cache_probe(void *par, char *url, Bool is_destroy)
665 : {
666 : ROUTEInCtx *ctx = (ROUTEInCtx *)par;
667 : u32 sid=0;
668 : char *subr;
669 59 : if (strncmp(url, "http://groute/service", 21)) return GF_FALSE;
670 :
671 49 : subr = strchr(url+21, '/');
672 49 : subr[0] = 0;
673 49 : sid = atoi(url+21);
674 49 : subr[0] = '/';
675 49 : if (is_destroy) {
676 14 : gf_route_dmx_remove_object_by_name(ctx->route_dmx, sid, subr+1, GF_TRUE);
677 35 : } else if (sid && (sid != ctx->tune_service_id)) {
678 0 : GF_LOG(GF_LOG_INFO, GF_LOG_ROUTE, ("[ROUTE] Request on service %d but tuned on service %d, retuning\n", sid, ctx->tune_service_id));
679 0 : ctx->tune_service_id = sid;
680 0 : ctx->sync_tsi = 0;
681 0 : ctx->last_toi = 0;
682 0 : if (ctx->clock_init_seg) gf_free(ctx->clock_init_seg);
683 0 : ctx->clock_init_seg = NULL;
684 0 : gf_route_atsc3_tune_in(ctx->route_dmx, sid, GF_TRUE);
685 : }
686 : return GF_TRUE;
687 : }
688 :
689 51504 : static GF_Err routein_process(GF_Filter *filter)
690 : {
691 51504 : ROUTEInCtx *ctx = gf_filter_get_udta(filter);
692 :
693 51504 : if (!ctx->nb_playing)
694 : return GF_EOS;
695 :
696 : while (1) {
697 52995 : GF_Err e = gf_route_dmx_process(ctx->route_dmx);
698 52995 : if (e == GF_IP_NETWORK_EMPTY) {
699 51496 : if (ctx->tune_time) {
700 47234 : if (!ctx->last_timeout) ctx->last_timeout = gf_sys_clock();
701 : else {
702 46263 : u32 diff = gf_sys_clock() - ctx->last_timeout;
703 46263 : if (diff > ctx->timeout) {
704 1 : GF_LOG(GF_LOG_INFO, GF_LOG_ROUTE, ("[ROUTE] No data for %d ms, aborting\n", diff));
705 : return GF_EOS;
706 : }
707 : }
708 : }
709 51495 : gf_filter_ask_rt_reschedule(filter, 1000);
710 : break;
711 1499 : } else if (!e) {
712 1499 : ctx->last_timeout = 0;
713 : }
714 : }
715 51495 : if (!ctx->tune_time) {
716 4262 : u32 diff = gf_sys_clock() - ctx->start_time;
717 4262 : if (diff>ctx->timeout) {
718 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] No data for %d ms, aborting\n", diff));
719 0 : gf_filter_setup_failure(filter, GF_SERVICE_ERROR);
720 0 : return GF_EOS;
721 : }
722 : }
723 :
724 51495 : if (ctx->stats) {
725 51495 : u32 now = gf_sys_clock() - ctx->start_time;
726 51495 : if (now >= ctx->nb_stats*ctx->stats) {
727 32 : ctx->nb_stats+=1;
728 32 : if (gf_filter_reporting_enabled(filter)) {
729 : Double rate=0.0;
730 : char szRpt[1024];
731 :
732 12 : u64 st = gf_route_dmx_get_first_packet_time(ctx->route_dmx);
733 12 : u64 et = gf_route_dmx_get_last_packet_time(ctx->route_dmx);
734 12 : u64 nb_pck = gf_route_dmx_get_nb_packets(ctx->route_dmx);
735 12 : u64 nb_bytes = gf_route_dmx_get_recv_bytes(ctx->route_dmx);
736 :
737 12 : et -= st;
738 12 : if (et) {
739 11 : rate = (Double)nb_bytes*8;
740 11 : rate /= et;
741 : }
742 12 : sprintf(szRpt, "[%us] "LLU" bytes "LLU" packets in "LLU" ms rate %.02f mbps", now/1000, nb_bytes, nb_pck, et/1000, rate);
743 12 : gf_filter_update_status(filter, 0, szRpt);
744 : }
745 : }
746 : }
747 :
748 : return GF_OK;
749 : }
750 :
751 :
752 6 : static GF_Err routein_initialize(GF_Filter *filter)
753 : {
754 : Bool is_atsc = GF_TRUE;
755 6 : ROUTEInCtx *ctx = gf_filter_get_udta(filter);
756 6 : ctx->filter = filter;
757 :
758 6 : if (!ctx->src) return GF_BAD_PARAM;
759 6 : if (!strncmp(ctx->src, "route://", 8)) {
760 : is_atsc = GF_FALSE;
761 2 : } else if (strcmp(ctx->src, "atsc://"))
762 : return GF_BAD_PARAM;
763 :
764 6 : if (ctx->odir) {
765 1 : ctx->gcache = GF_FALSE;
766 : }
767 :
768 6 : if (ctx->gcache) {
769 5 : ctx->dm = gf_filter_get_download_manager(filter);
770 5 : if (!ctx->dm) return GF_SERVICE_ERROR;
771 5 : gf_dm_set_localcache_provider(ctx->dm, routein_local_cache_probe, ctx);
772 : } else {
773 : //for now progressive dispatch is only possible when populating cache
774 1 : ctx->fullseg = GF_TRUE;
775 : }
776 6 : if (!ctx->nbcached)
777 0 : ctx->nbcached=1;
778 :
779 6 : if (is_atsc) {
780 2 : ctx->route_dmx = gf_route_atsc_dmx_new(ctx->ifce, ctx->buffer, routein_on_event, ctx);
781 : } else {
782 : char *sep, *root;
783 : u32 port;
784 4 : sep = strrchr(ctx->src+8, ':');
785 4 : if (!sep) {
786 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] Missing port number\n"));
787 : return GF_BAD_PARAM;
788 : }
789 4 : sep[0] = 0;
790 4 : root = strchr(sep+1, '/');
791 4 : if (root) root[0] = 0;
792 4 : port = atoi(sep+1);
793 4 : if (root) root[0] = '/';
794 :
795 4 : if (!gf_sk_is_multicast_address(ctx->src+8)) {
796 0 : GF_LOG(GF_LOG_ERROR, GF_LOG_ROUTE, ("[ROUTE] %s is not a multicast address\n"));
797 0 : sep[0] = ':';
798 0 : return GF_BAD_PARAM;
799 : }
800 4 : ctx->route_dmx = gf_route_dmx_new(ctx->src+8, port, ctx->ifce, ctx->buffer, routein_on_event, ctx);
801 4 : sep[0] = ':';
802 : }
803 6 : if (!ctx->route_dmx) return GF_SERVICE_ERROR;
804 :
805 6 : gf_route_set_allow_progressive_dispatch(ctx->route_dmx, !ctx->fullseg);
806 :
807 6 : gf_route_set_reorder(ctx->route_dmx, ctx->reorder, ctx->rtimeout);
808 :
809 6 : if (ctx->tsidbg) {
810 1 : gf_route_dmx_debug_tsi(ctx->route_dmx, ctx->tsidbg);
811 : }
812 :
813 6 : if (ctx->tunein>0) ctx->tune_service_id = ctx->tunein;
814 :
815 6 : if (is_atsc) {
816 2 : GF_LOG(GF_LOG_DEBUG, GF_LOG_ROUTE, ("[ROUTE] ATSC 3.0 Tunein started\n"));
817 2 : if (ctx->tune_service_id)
818 0 : gf_route_atsc3_tune_in(ctx->route_dmx, ctx->tune_service_id, GF_FALSE);
819 : else
820 2 : gf_route_atsc3_tune_in(ctx->route_dmx, (u32) ctx->tunein, GF_TRUE);
821 : }
822 :
823 6 : ctx->start_time = gf_sys_clock();
824 :
825 6 : if (ctx->stsi) ctx->tsi_outs = gf_list_new();
826 6 : if (ctx->max_segs)
827 1 : ctx->received_seg_names = gf_list_new();
828 :
829 6 : ctx->nb_playing = 1;
830 6 : ctx->initial_play_forced = GF_TRUE;
831 6 : return GF_OK;
832 : }
833 :
834 9 : static Bool routein_process_event(GF_Filter *filter, const GF_FilterEvent *evt)
835 : {
836 9 : ROUTEInCtx *ctx = gf_filter_get_udta(filter);
837 9 : if (evt->base.type==GF_FEVT_PLAY) {
838 5 : if (!ctx->initial_play_forced)
839 0 : ctx->nb_playing++;
840 5 : ctx->initial_play_forced = GF_FALSE;
841 : } else {
842 4 : ctx->nb_playing--;
843 : }
844 9 : return GF_TRUE;
845 : }
846 :
847 : #define OFFS(_n) #_n, offsetof(ROUTEInCtx, _n)
848 : static const GF_FilterArgs ROUTEInArgs[] =
849 : {
850 : { OFFS(src), "URL of source content - see filter help", GF_PROP_NAME, NULL, NULL, 0},
851 : { OFFS(ifce), "default interface to use for multicast. If NULL, the default system interface will be used", GF_PROP_STRING, NULL, NULL, GF_FS_ARG_HINT_ADVANCED},
852 : { OFFS(gcache), "indicate the files should populate GPAC HTTP cache - see filter help", GF_PROP_BOOL, "true", NULL, GF_FS_ARG_HINT_ADVANCED},
853 : { OFFS(tunein), "service ID to bootstrap on for ATSC 3.0 mode. 0 means tune to no service, -1 tune all services -2 means tune on first service found", GF_PROP_SINT, "-2", NULL, 0},
854 : { OFFS(buffer), "receive buffer size to use in bytes", GF_PROP_UINT, "0x80000", NULL, GF_FS_ARG_HINT_ADVANCED},
855 : { OFFS(timeout), "timeout in ms after which tunein fails", GF_PROP_UINT, "5000", NULL, 0},
856 : { OFFS(nbcached), "number of segments to keep in cache per service", GF_PROP_UINT, "8", NULL, GF_FS_ARG_HINT_EXPERT},
857 : { OFFS(kc), "keep corrupted file", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_ADVANCED},
858 : { OFFS(skipr), "skip repeated files - ignored in cache mode", GF_PROP_BOOL, "true", NULL, GF_FS_ARG_HINT_ADVANCED},
859 : { OFFS(stsi), "define one output pid per tsi/serviceID - ignored in cache mode, see filter help", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_EXPERT},
860 : { OFFS(stats), "log statistics at the given rate in ms (0 disables stats)", GF_PROP_UINT, "1000", NULL, GF_FS_ARG_HINT_ADVANCED},
861 : { OFFS(tsidbg), "gather only objects with given TSI (debug)", GF_PROP_UINT, "0", NULL, GF_FS_ARG_HINT_EXPERT},
862 : { OFFS(max_segs), "maximum number of segments to keep on disk", GF_PROP_UINT, "0", NULL, GF_FS_ARG_HINT_EXPERT},
863 : { OFFS(odir), "output directory for standalone mode - see filter help", GF_PROP_STRING, NULL, NULL, GF_FS_ARG_HINT_ADVANCED},
864 : { OFFS(reorder), "ignore order flag in ROUTE/LCT packets, avoiding considering object done when TOI changes", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_EXPERT},
865 : { OFFS(rtimeout), "default timeout in ms to wait when gathering out-of-order packets", GF_PROP_UINT, "5000", NULL, GF_FS_ARG_HINT_EXPERT},
866 : { OFFS(fullseg), "only dispatch full segments in cache mode (always true for other modes (source, standalone))", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_ADVANCED},
867 : { OFFS(repair), "repair mode for corrupted files (see filter help)\n"
868 : "- no: no repair is performed\n"
869 : "- simple: simple repair is performed (incomplete mdat boxes will be kept)\n"
870 : "- strict: incomplete mdat boxes will be lost as well as preceding moof box\n"
871 : "- full: HTTP-based repair, not yet implemented"
872 : , GF_PROP_UINT, "simple", "no|simple|strict|full", GF_FS_ARG_HINT_EXPERT},
873 :
874 : {0}
875 : };
876 :
877 : static const GF_FilterCapability ROUTEInCaps[] =
878 : {
879 : CAP_UINT(GF_CAPS_OUTPUT, GF_PROP_PID_STREAM_TYPE, GF_STREAM_FILE),
880 : };
881 :
882 : GF_FilterRegister ROUTEInRegister = {
883 : .name = "routein",
884 : GF_FS_SET_DESCRIPTION("ROUTE input")
885 : #ifndef GPAC_DISABLE_DOC
886 : .help = "This filter is a receiver for ROUTE sessions (ATSC 3.0 and generic ROUTE).\n"
887 : "- ATSC 3.0 mode is identified by the URL `atsc://`.\n"
888 : "- Generic ROUTE mode is identified by the URL `route://IP:PORT`.\n"
889 : "\n"
890 : "The filter can work in cached mode, source mode or standalone mode.\n"
891 : "# Cached mode\n"
892 : "The cached mode is the default filter behavior. It populates GPAC HTTP Cache with the received files, using `http://groute/serviceN/` as service root, N being the ROUTE service ID.\n"
893 : "In cached mode, repeated files are always pushed to cache.\n"
894 : "The maximum number of media segment objects in cache per service is defined by [-nbcached](); this is a safety used to force object removal in case DASH client timing is wrong and some files are never requested at cache level.\n"
895 : " \n"
896 : "The cached MPD is assigned the following headers:\n"
897 : "- x-route: integer value, indicates the ROUTE service ID.\n"
898 : "- x-route-first-seg: string value, indicates the name of the first segment (completely or currently being) retrieved from the broadcast.\n"
899 : "- x-route-ll: boolean value, if yes indicates that the indicated first segment is currently being received (low latency signaling).\n"
900 : "- x-route-loop: boolean value, if yes indicates a loop in the service has been detected (usually pcap replay loop).\n"
901 : " \n"
902 : "The cached files are assigned the following headers:\n"
903 : "- x-route: boolean value, if yes indicates the file comes from an ROUTE session.\n"
904 : "\n"
905 : "If [-max_segs]() is set, file deletion event will be triggered in the filter chain.\n"
906 : "\n"
907 : "# Source mode\n"
908 : "In source mode, the filter outputs files on a single output pid of type `file`. "
909 : "The files are dispatched once fully received, the output pid carries a sequence of complete files. Repeated files are not sent unless requested.\n"
910 : "If needed, one pid per TSI can be used rather than a single pid. This avoids mixing files of different mime types on the same pid (e.g. mpd and isobmff).\n"
911 : "EX gpac -i atsc://gcache=false -o $ServiceID$/$File$:dynext\n"
912 : "This will grab the files and forward them as output PIDs, consumed by the [fout](fout) filter.\n"
913 : "\n"
914 : "If [-max_segs]() is set, file deletion event will be triggered in the filter chain.\n"
915 : "\n"
916 : "# Standalone mode\n"
917 : "In standalone mode, the filter does not produce any output pid and writes received files to the [-odir]() directory.\n"
918 : "EX gpac -i atsc://:odir=output\n"
919 : "This will grab the files and write them to `output` directory.\n"
920 : "\n"
921 : "If [-max_segs]() is set, old files will be deleted.\n"
922 : "\n"
923 : "# File Repair\n"
924 : "In case of losses or incomplete segment reception (during tune-in), the files are patched as follows:\n"
925 : "- MPEG-2 TS: all lost ranges are adjusted to 188-bytes boundaries, and transformed into NULL TS packets.\n"
926 : "- ISOBMFF: all top-level boxes are scanned, and incomplete boxes are transformed in `free` boxes, except mdat kept as is if [-repair]() is set to simple.\n"
927 : "\n"
928 : "If [-kc]() option is set, corrupted files will be kept. If [-fullseg]() is not set and files are only partially received, they will be kept.\n"
929 : "\n"
930 : "# Interface setup\n"
931 : "On some systems (OSX), when using VM packet replay, you may need to force multicast routing on your local interface.\n"
932 : "For ATSC, you will have to do this for the base signaling multicast (224.0.23.60):\n"
933 : "EX route add -net 224.0.23.60/32 -interface vboxnet0\n"
934 : "Then for each ROUTE service in the multicast:\n"
935 : "EX route add -net 239.255.1.4/32 -interface vboxnet0\n"
936 : "",
937 : #endif //GPAC_DISABLE_DOC
938 : .private_size = sizeof(ROUTEInCtx),
939 : .args = ROUTEInArgs,
940 : .initialize = routein_initialize,
941 : .finalize = routein_finalize,
942 : SETCAPS(ROUTEInCaps),
943 : .process = routein_process,
944 : .process_event = routein_process_event,
945 : .probe_url = routein_probe_url
946 : };
947 :
948 2877 : const GF_FilterRegister *routein_register(GF_FilterSession *session)
949 : {
950 2877 : return &ROUTEInRegister;
951 : }
952 :
953 : #else
954 :
955 : const GF_FilterRegister *routein_register(GF_FilterSession *session)
956 : {
957 : return NULL;
958 : }
959 :
960 : #endif /* GPAC_DISABLE_ROUTE */
961 :
|