from pytube import YouTube
import os
def stream_video(url):
yt = YouTube(url)
stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
print(stream.download())
if __name__ == '__main__':
url = 'https://www.youtube.com/watch?v=d7Vw-iO3pb8'
stream_video(url)
import string
print(string.ascii_letters, string.ascii_lowercase, string.ascii_uppercase, string.digits, string.hexdigits, string.whitespace, string.punctuation)
import streamlink
from streamlink.stream import HLSStream
from streamlink.plugins.rtmpdump import RTMPStream
import time
from datetime import datetime
from time import sleep
def stream_m3u8(url, path):
streams = streamlink.streams(url)
stream = HLSStream(streams)
stream.open()
with open(path, 'wb') as f:
while True:
try:
time.sleep(1)
data = stream.read(1024)
f.write(data)
except KeyboardInterrupt:
print("Exiting")
break
stream.close()
from moviepy.editor import *
def stream_video(source, destination):
clip = VideoFileClip(source)
clip.write_videofile(destination, fps=24)
stream_video('source.m3u8', 'destination.mp4')
import pyM3U8
import subprocess
# url to m3u8 stream
url = 'https://xxxx.m3u8'
# get m3u8 data
m3u8_obj = pyM3U8.load(url)
# get the stream url
stream_url = m3u8_obj.segments[0].absolute_uri
# start convert to rtmp
subprocess.run('ffmpeg -i {} -c copy -f flv rtmp://localhost:1935/live/test'.format(stream_url))
def print_centralized(words):
print(words.center(50, ' '))
print_centralized('Hello World')
fix invalid codeSun, 11 Dec 2022 ## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
## License
[MIT](https://choosealicense.com/licenses/mit/)
#!/usr/bin/env python
import subprocess
import sys
import re
import os
import time
import datetime
import uuid
def publish_youtube(title):
print "[>] Starting Streaming to Youtube ..."
proc = subprocess.Popen("ffmpeg -i %s -c:v libx264 -preset veryfast -maxrate 3000k -bufsize 6000k -pix_fmt yuv420p -g 50 -c:a aac -b:a 160k -ac 2 -ar 44100 -f flv rtmp://a.rtmp.youtube.com/live2/%s" % (title, key), stdout=sys.stdout,stderr=sys.stderr, shell=True)
time.sleep(4)
def start_streaming(url, title):
print "[>] Starting Streaming ..."
proc = subprocess.Popen("ffmpeg -re -i %s -c copy -f flv %s" % (url, title), stdout=sys.stdout, stderr=sys.st
from pym3u8 import load
m3u8_url = 'https://some.m3u8'
rtmp_url = 'rtmp://some.rtmp'
def stream_m3u8_to_rtmp():
m3u8 = load(m3u8_url)
for segment in m3u8.segments:
print("streaming segment: ", segment.absolute_uri)
subprocess.call("ffmpeg -i {} -c copy -bsf:a aac_adtstoasc {}".format(segment.absolute_uri, rtmp_url), shell=True)
print("end of stream??")
if __name__ == "__main__":
stream_m3u8_to_rtmp()
test = "https://www.youtube.com/watch?v=Nq2wYlWFucg"
test[test.find("https")+len("https"):]
def stream(url):
subprocess.call(["pyRTMP", "-a", "rtmp://a.rtmp.youtube.com/live2", "-y", url])
import random
def dice():
return random.randint(1,6)
dice()
/*school management */
CREATE TABLE IF NOT EXISTS 'students'(
'id' int(11) NOT NULL AUTO_INCREMENT,
'name' varchar(50) NOT NULL,
'age' int(2) NOT NULL,
'address' varchar(50) NOT NULL,
'class' varchar(50) NOT NULL,
PRIMARY KEY ('id'));
INSERT INTO 'students' ('name','age','address','class') VALUES ('muhaiminul islam','21','dhaka','cse');
INSERT INTO 'students' ('name','age','address','class') VALUES ('monirul','23','rajshahi','cse');
INSERT INTO 'students' ('name','age','address','class') VALUES ('ramiz','22','chittagong','cse');
INSERT INTO 'students' ('name','age','address','class') VALUES ('mahmud','23','khulna','cse');
INSERT INTO 'students' ('name','age','address','class
def send():
print("Sending your live stream to Youtube...")
os.system("ffmpeg -re -i "+videopath+" -c copy -f flv rtmp://a.rtmp.youtube.com/live2/"+"rtmp_key")
print("Done")
def stream_to_youtube(url):
clip = VideoFileClip("my_video.mp4")
clip.write_videofile("my_video.mp4")
import string
print(string.ascii_letters + string.digits + string.punctuation)
url = 'http://live.3gv.ifeng.com/zixun.m3u8'
key = 'fmW4bd8Kj'
def live(url,key):
import subprocess
p = subprocess.Popen(['ffmpeg','-i',url,'-c','copy','-f','flv','rtmp://live.hkstv.hk.lxdns.com/live/'+key],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
while p.poll() == None:
line = p.stdout.readline()
print(line)
live(url,key)
import sqlite3
conn = sqlite3.connect('database.db')
c = conn.cursor()
student_id = "2021"
student_name = "chris"
student_major = "CSE"
c.execute("CREATE TABLE IF NOT EXISTS student (id text, name text, major text)")
c.execute("INSERT INTO student (id, name, major) VALUES (?, ?, ?)", (student_id, student_name, student_major))
conn.commit()
c.execute("ALTER TABLE student ADD COLUMN gpa REAL")
c.execute("UPDATE student SET gpa = 4.0")
c.execute("DELETE FROM student WHERE id = ?", (student_id,))
c.execute("SELECT name, major, gpa FROM student ORDER BY major DESC")
c.execute("SELECT major, AVG(gpa) FROM student GROUP BY major")
import csv
def create_user(user_id):
password = input('Enter a password: ')
with open('users.csv', 'a') as csvfile:
fieldnames = ['user_id', 'password']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writerow({'user_id': user_id, 'password': password})
def find_password(user_id):
with open('users.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if row['user_id'] == user_id:
print(row['password'])
create_user('123')
find_password('123')
from string import *
print(digits)
print(hexdigits)
print(octdigits)
print(punctuation)
print(printable)
print(whitespace)
def print_full_line():
width = shutil.get_terminal_size().columns
print("=" * width)
print_full_line()
from moviepy.editor import *
def stream_m3u8(url):
clip = VideoFileClip(url)
clip.preview()
stream_m3u8(url)
The code above is to create a random password for a specific length.
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
def main():
for x in range(0,10):
os.system("youtube-dl --extract-audio --audio-format mp3 --no-playlist https://www.youtube.com/watch?v=Nq2wYlWFucg")
if __name__ == '__main__':
main()
### 2. Write a program in Python to demonstrate the use of constructor and destructor.
def stream_to_youtube(m3u8, output_path):
# TODO
from subprocess import call
from datetime import datetime
from datetime import timedelta
import subprocess
import os
import time
from google.cloud import storage
def stream_to_youtube(data, context):
stream_name = data['stream_name']
hls_url = data['hls_url']
start_time = data['start_time']
end_time = data['end_time']
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print('File {} uploaded to {}.'.format(
source_file_name,
destination_blob_name))
def upload_to_bucket(stream_name, hls_url, start_time, end_time):
cmd = '
def stream(url, out):
os.system("ffmpeg -i '{0}' -acodec copy -vcodec copy -f flv '{1}'".format(url, out))
def add(a, b):
return a + b
import string
print(string.ascii_letters + string.digits + string.punctuation + string.whitespace)
s = "abcdefghijklmnopqrstuvwxyz"
print(s.upper())
print(s.lower())
s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
print(s.upper())
print(s.lower())
s = "0123456789"
print(s.upper())
print(s.lower())
s = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
print(s.upper())
print(s.lower())
fix invalid codeSun, 11 Dec 2022 ---
**Exercise 2**
Write a program that prints out a multiplication table (5 x 5, for example) of the first five prime numbers. Ask the user for the number of multiplication tables they want to print out.
The program must work for any amount of multiplication tables and any amount of primes to calculate the tables out of.
_Hint:_ you will probably want to use nested for loops.
from moviepy.editor import *
import sys
url = (sys.argv[1])
clip = VideoFileClip(url)
clip = clip.set_fps(30).set_audio_fps(44100).set_audio_bitrate(128)
clip.write_videofile("output.flv", codec='libx264', audio_codec='aac')
from python_ffmpeg_video_streaming import Streaming
stream = Streaming("http://example.com/video.m3u8", "rtmp://a.rtmp.youtube.com/live2/<youtube-key>")
stream.start()
import m3u8
import subprocess
import time
def stream(url):
m3u8_obj = m3u8.load(url)
for segment in m3u8_obj.segments:
print(segment.uri)
subprocess.call(['ffmpeg', '-i', segment.uri, '-c', 'copy', '-bsf:a', 'aac_adtstoasc', '-f', 'flv', 'rtmp://a.rtmp.youtube.com/live2/x/xxxxxxxx'])
time.sleep(30)
CREATE TABLE IF NOT EXISTS `student` (
`id` int(11) NOT NULL,
`student_name` varchar(80) NOT NULL,
`student_class` int(11) NOT NULL,
`student_grade` int(11) NOT NULL,
`student_fees` int(11) NOT NULL,
`student_dob` date NOT NULL
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=latin1;
class Stack:
def __init__(self):
self.stack = []
def is_empty(self):
return self.stack == []
def push(self, item):
self.stack.append(item)
def pop(self):
return self.stack.pop()
def size(self):
return len(self.stack)
s = Stack()
print(s.is_empty())
s.push(4)
s.push('dog')
print(s.pop())
print(s.size())
import string
import random
def password_generator(size):
chars = string.ascii_lowercase + string.ascii_uppercase + string.digits + string.punctuation
return ''.join(random.choice(chars) for _ in range(size))
print(password_generator(16))
import shutil
cols = shutil.get_terminal_size().columns
def center(text):
return text.center(cols)
password_length = int(input("What's the length of the password? "))
password = "".join(random.choice(constants) for i in range(password_length))
print(password)
def sub(a, b):
return a - b
sub(1, 2)
import time
import ffmpeg
(
ffmpeg
.input('http://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8')
.output('pipe:', format='mpegts')
.overwrite_output()
.global_args('-nostats', '-loglevel', '0')
.run_async(pipe_stdout=True, pipe_stderr=True)
.pipe(ffmpeg
.input('pipe:')
.output('rtmp://a.rtmp.youtube.com/live2/YOUTUBE_STREAMING_KEY', format='flv', vcodec='copy',
acodec='copy', rtmp_live='live', rtmp_pageurl='http://google.com', rtmp_buffer='50000',
rtmp_conn='S:
def main():
# stream m3u8 url to youtube
stream_m3u8_to_youtube("m3u8 url", "youtube channel url")
# stream m3u8 url to youtube api
stream_m3u8_to_youtube_api("m3u8 url", "youtube api url")
from tkinter import *
def result(a,b):
print(a,b)
return int(a) * int(b)
def btnclick(i):
operation = i
if(operation == "add"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "sub"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "mul"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "div"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
else:
def stream(url)
rtmpUrl = 'rtmp://ip:port/app/stream'
cmd = 'ffmpeg -re -i "{0}" -acodec libmp3lame -ar 44100 -b:a 128k -pix_fmt yuv420p -profile:v baseline -s 1280x720 -bufsize 6000k -vb 400k -maxrate 1500k -deinterlace -vcodec libx264 -preset veryfast -g 30 -r 30 -f flv "{1}"'.format(url,rtmpUrl)
os.system(cmd)
stream('m3u8 url here')
import os
def backup_mysql_db(db_name, db_user, db_pass, db_host="localhost", db_port=3306):
"""
Function that perform a backup of the database to the OS temporary
directory (/tmp).
"""
db_name = db_name
db_user = db_user
db_pass = db_pass
db_host = db_host
db_port = db_port
# create the backup file and set the file name
bkp_file = db_name + '_backup.sql'
# create the command that will be used to create the backup
bkp_cmd = 'mysqldump -u ' + db_user + ' -p' + db_pass + ' -h ' + db_host + ' -P ' + str(db_port) + ' ' + db_name + ' > ' + '/tmp/' + bkp_file
# perform the backup
os.system(bkp_cmd)
return bk
import os
def update_marks(roll_number, marks):
f = open("students_marks.dat", "rb+")
f.seek(roll_number * 24)
# get rid of the trailing \n
name = f.read(20).strip()
f.seek(roll_number * 24)
f.write(name + str(marks))
f.close()
import pym3u8
import cv2
import youtube_dl
def download_video(url, quality):
video_url = url
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio'
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
print('Downloaded in Quality ' + quality)
if __name__ == '__main__':
# url = 'https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_4x3/bipbop_4x3_variant.m3u8'
url = 'http://103.13.0.171:8081/hls/test.m3u8'
stream = pym3u8.load(url)
quality = '480'
while True:
video_path = stream.segments[
def streamlink(url, quality):
# get the streamlink
streams = streamlink.streams(url)
# get the stream of best quality
stream = streams[quality]
# get streams url
url = stream.to_url()
# return streams url
return url
def print_line(length):
for i in range(length):
print('=', end='')
def backup_database(source, destination):
#backup the database
backup_database('/mysql/db01', '/backups/db01')
def m3u8_to_rtmp(m3u8_url, rtmp_url, timeout=300):
ff = FFmpeg(
inputs={m3u8_url: None},
outputs={rtmp_url: '-c copy -map 0'})
ff.run(cmd='ffmpeg', timeout=timeout)
translateSun, 25 Dec 2022 def python():
print("Python")
CREATE TABLE students (
id INTEGER,
name TEXT,
marks REAL
);
INSERT INTO students (id, name, marks)
VALUES (1, 'John Doe', 90.5);
SELECT * FROM students;
ALTER TABLE students ADD Class INTEGER;
UPDATE students SET Class = 1 WHERE id = 1;
UPDATE students SET marks = 100 WHERE id = 1;
DELETE FROM students WHERE id = 1;
SELECT * FROM students ORDER BY marks DESC;
SELECT * FROM students ORDER BY marks ASC;
SELECT avg(marks), min(marks), max(marks), sum(marks), count(*) FROM students;
def add(a, b):
"""
This function simply adds two numbers together.
Don't worry, we'll make this docstring better soon.
"""
return a + b
import string
print(string.ascii_letters + string.digits + string.punctuation)
print("""
abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
""")
import random
password = []
for i in range(8):
password.append(random.choice('abcdefghijklmnopqrstuvwxyz'))
random.shuffle(password)
password = ''.join(password)
print(password)
def stream(self):
print("Streaming started...\n")
stream = ffmpeg.input(self.url)
stream = ffmpeg.output(stream, self.rtmpUrl, **self.ffmpegParams)
ffmpeg.run(stream, **self.ffmpegParams)
print("Streaming ended...")
def stream_to_rtmp(m3u8_url, output, swf_url, page_url, token_url, timeout=10, verbose=False):
rtmpdump_cmd = ['rtmpdump', '-r', m3u8_url, '-o', output, '--swfVfy', swf_url, '--pageUrl', page_url, '--token',
token_url, '--live']
if verbose:
rtmpdump_cmd.append('--verbose')
process = subprocess.Popen(rtmpdump_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
count = 0
while True:
# Wait for the process to finish and timeout every 10 seconds
exit_code = process.poll()
if exit_code is not None:
break
time.sleep(1)
count += 1
if count >= timeout:
process.terminate()
break
import string
def command():
return string.ascii_letters, string.ascii_lowercase, string.ascii_uppercase, string.digits, string.hexdigits, string.whitespace , string.punctuation
print(command())
opencv_ffmpeg.set_ffmpeg_bin('/Users/albertquispe/Desktop/ffmpeg')
test_stream = opencv_ffmpeg.create_cv_capture('https://XXXXX.m3u8')
yts = YouTubeStreaming('username', 'password')
yts.start_video_stream(test_stream, 'stream_title', 'stream_description')
import moviepy.editor as mp
import urllib.request
import urllib.parse
def stream_to_youtube(url, title=None, description=None, keywords=None, category=None, privacyStatus="private"):
"""Function to stream video file to youtube
Args:
url: Url of the video file
title: Title of the video
description: Description of the video
keywords: Keywords of the video
category: Category of the video
privacyStatus: Privacy status of the video
"""
# Get the video
video = mp.VideoFileClip(url)
# Get the audio
audio = video.audio
# Make a temporary file for the audio
temp_audio = "temp-audio.m4a"
# Write the audio to file
audio.write_audiofile(temp_audio)
# Download the video to file
video.write_videofile("temp-video.mp4", audio=temp_audio)
# Get the video size in MB
video_size = video
translateTue, 03 Jan 2023 def add(a, b):
return a + b
def print_hello(name):
print("hello " + name)
The function add takes two arguments a and b and returns the sum of a and b.
Note: the symbols ''' are used only to explain the code.
"""
explanation = re.sub(r' ', '', explanation)
explanation = re.sub(r'
from moviepy.editor import *
from moviepy.editor import *
from moviepy.video.fx.all import *
from moviepy.audio.fx.all import *
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
def main():
"""
This is the main function, where the program starts
"""
# Replace with the actual M3U8 URL
url = "https://xxxxxxx.m3u8"
# Open the video file
original = VideoFileClip(url)
# Call the function for creating the new video
# The new video is called "cut_video"
cut_video = generate_new_video(original)
# Save the new video
cut_video.write_videofile("output.flv", codec='libx264', audio_codec='aac')
def generate_new_video(original):
"""
This function will generate the new video
The original video is passed
def stream (x):
clip = VideoFileClip(x)
return clip.ipython_display()
from string import *
from pym3u8 import load
import requests
import subprocess
import time
while True:
m3u8_url='http://xxxx/live/stream.m3u8'
interval = 10
rtmp_url = "rtmp://xxxx/live/stream"
r = requests.get(m3u8_url)
if r.status_code == 200:
playlist = load(m3u8_url)
if playlist.segments:
cmd = "ffmpeg -re -i {0} -c copy -f flv {1}".format(playlist.segments[0].absolute_uri, rtmp_url)
print (cmd)
subprocess.call(cmd, shell=True)
time.sleep(interval)
def say_hello(to_whom = "World"):
print("Hello " + to_whom + "!")
say_hello()
say_hello("Ben")
def print_full_line(input_string):
divider = '=' * len(input_string)
print(divider)
print_full_line('abc')
import streamlink
streamlink.streams("https://www.youtube.com/watch?v=PxoZPQ2wiwk")
streamlink --stream-url http://www.youtube.com/watch?v=PxoZPQ2wiwk best
import logging
import m3u8
import os
import ffmpeg
import requests
import subprocess
logging.basicConfig(level=logging.DEBUG)
def download_hls_video(url, out_path):
"""Download a HLS video and save it to disk."""
m3u8_obj = m3u8.load(url)
base_url = '/'.join(url.split('/')[:-1])
# check if output path exists
if not os.path.exists(out_path):
os.makedirs(out_path)
# if there's only one segment, download it, no need to merge
if len(m3u8_obj.segments) == 1:
logging.info('only one segment, downloading without merging')
# create the output path
output_path = os.path.join(out_path,
m3u8_obj.segments[0].uri.split('/')[-1])
# download and save the video
def Hotel Management System(name, ID, password, phone_num, email, room_num, check_in, check_out):
if name == name and ID == ID and password == password and phone_num == phone_num and email == email:
return room_num, check_in, check_out
else:
return 'Invalid Credentials'
Hotel Management System('Ali', '100', '123', '123456789', '123@gmail.com', '101', '12/3/2020', '12/4/2020')
So, The user can easily understand the working of the program.
##### 2.2.2 Documenting the code
Documentation is an integral part of the coding. It is a very useful tool for the developers to understand the code better. Documentation helps the other developers to understand the code a lot better way and also helps them to modify the code as per their needs.
Documentation also helps in maintaining the code. Its like a reference book of the code written by the original programmer.
### 2.3 Functionalities
#### 2.3.1 Stationery
import os
os.system("""
gst-launch-1.0 playbin uri=https://videocdn-us.geocdn.scaleengine.net/v0/video/streams/saber-test-chunklist.m3u8
! tee name=pre
! queue
! mpegtsmux name=mux
! rtmpsink location='rtmp://127.0.0.1/live/test'
pre. ! queue
! mux.
""")
def stream(self, uri, rtmp_url):
m3u8 = M3U8(uri)
if not m3u8.is_variant:
self.stream_single_m3u8(m3u8, rtmp_url)
else:
for playlist in m3u8.playlists:
self.stream_single_m3u8(playlist, rtmp_url)
import shutil
def print_centered(text):
screen_width = shutil.get_terminal_size().columns
text_len = len(text)
left_margin = (screen_width - text_len) // 2
print(' ' * left_margin, text)
import os
os.system("ffmpeg -re -i " + '"http://path.to.m3u8"' + " -c:v copy -c:a aac -strict experimental -f flv rtmp://a.rtmp.youtube.com/live2/" + '"YOUR_STREAM_KEY"')
stack = []
stack.append(3)
stack.append(2)
stack.append(1)
print stack
print stack.pop()
print stack
import os
import subprocess
def youtube_live(m3u8_url, youtube_url):
os.system("rtmpdump -r '" + m3u8_url + "' -o -|ffmpeg -i - -vcodec copy -acodec copy -f flv '" + youtube_url + "'")
def youtube_live_subprocess(m3u8_url, youtube_url):
process = subprocess.check_output("rtmpdump -r '" + m3u8_url + "' -o -|ffmpeg -i - -vcodec copy -acodec copy -f flv '" + youtube_url + "'", shell=True)
def youtube_live_subprocess_popen(m3u8_url, youtube_url):
process = subprocess.Popen("rtmpdump -r '" + m3u8_url + "' -o -|ffmpeg -i - -vcodec copy -acodec copy -f flv '" + youtube_url + "'", shell=True)
def stream_m3u8(url_m3u8, rtmp_url=RTMP_URL):
streamlink = Streamlink()
streams = streamlink.streams(url_m3u8)
if streams:
stream = streams["best"]
stream.open()
stream.pipe()
stream.close()
return 0
else:
print("Stream not available")
return 1
stream_m3u8(url_m3u8)
def main(url):
clip = VideoFileClip(url)
clip.preview()
main("my_video.mp4")
print("https://www.youtube.com/watch?v=Nq2wYlWFucg".replace("https", ""))
print('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890~!@#$%^&*()_+-=')
from string import *
constants = ascii_letters + digits + punctuation
print(constants)
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
def _get_stream_data(self, m3u8_url):
"""Get stream data from the m3u8 url.
Parameters
----------
m3u8_url : str
Url of the m3u8 stream data.
Raises
------
NotImplementedError
If not implemented by the subclass.
"""
req = urllib.request.Request(
m3u8_url, headers={'User-Agent': 'Mozilla/5.0'})
m3u8_obj = m3u8.load(urllib.request.urlopen(req))
if m3u8_obj.is_variant:
for playlist in m3u8_obj.playlists:
bandwidth = int(playlist.stream_info.bandwidth)
program = playlist.stream_info.program_id
resolution = playlist.stream_info.resolution
if resolution:
self.streams[
f'{bandwidth}-{program}'] = {
def line(output_screen_width):
return '=' * output_screen_width
line(80)
import subprocess
import os
def live_stream(m3u8_url,rtmp_url):
cmd = "ffmpeg -i '"+ m3u8_url +"' -c copy -f flv '"+ rtmp_url +"'"
os.system(cmd)
CREATE PROCEDURE program_for_data_entries_for_school_management()
BEGIN
IF code_lines=500
THEN
CREATE DATABASE entries_for_school_management
END IF;
END;
from math import sqrt
def pythagorean(a, b):
return sqrt(a * a + b * b)
translateTue, 03 Jan 2023 def add(a, b):
return a + b
sudo apt-get install libav-tools
def stream(a, b):
return a + b
stream(1, 2)
def stream_to_youtube(url):
print(url)
stream_to_youtube("smth")
import pyffmpeg
ff = pyffmpeg.FFMpeg()
stream = ff.open('http://example.com/live/channel1.m3u8')
output = ff.open('rtmp://example.com:1935/live/channel1')
while True:
frame, val = stream.read()
if val == 0:
break
output.write(frame)
def print_centralized_words(words, sep=' ', end='\n', file=sys.stdout, flush=False):
print(words, sep=sep, end=end, file=file, flush=flush)
def createProject():
#create project
#create file
#create virtual environment
#add packages
#create db
#configure db
#add app
return True
createProject()
from urllib import request
url = "http://www.google.com"
x = request.urlopen(url)
print(x.read())
def process_line(line):
word = ''
for c in line:
if c.isalpha():
word += c
else:
if word:
print(word, end='#')
word = ''
if word:
print(word, end='#')
print()
with open('filename.txt') as infile:
for line in infile:
process_line(line)
from urllib2 import urlopen
from livestreamer import Livestreamer
from livestreamer.stream import StreamError
url = 'http://fmsb.mediacdn.vn/live/fptbongda1_500.m3u8'
livestreamer = Livestreamer()
streams = livestreamer.streams(url)
try:
stream = streams["best"]
print stream.url
except StreamError, err:
print "Unable to find stream: {0}".format(err)
pip install youtube_upload
from youtube_upload import upload_video
upload_video(file='videoplayback.mp4',
title='video title',
description='video description',
keywords=['tag1', 'tag2'],
category=22,
privacy=private)
# to clear the console
import os
os.system('clear')
def print_line(length):
print("=" * length)
print_line(5)
def print_full_line():
width = shutil.get_terminal_size().columns
print("=" * width)
print_full_line()
#!/usr/bin/env python3
import time
import sys
import os
import logging
import signal
from ffms import FFMS_Init, FFMS_GetVersion, FFMS_Destroy
from ffms import FFMS_GetError
from ffms import FFMS_Error
from ffms import FFMS_SetLogLevel
from ffms import FFMS_LogLevel
from ffms import FFMS_SetVideoSource
from ffms import FFMS_Index
from ffms import FFMS_TrackType
from ffms import FFMS_GetNumTracks
from ffms import FFMS_GetTrackFromIndex
from ffms import FFMS_GetFrame
from ffms import FFMS_SetOutputFormatV
from ffms import FFMS_GetVideoProperties
from ffms import FFMS_GetPixFmt
from ffms import FFMS_PIX_FMT_BGRA
from ffms import FFMS_PIX_FMT_RGBA
from ffms import FFMS_PIX_FMT_BGR24
from ffms import FFMS_ReadFrame
import string
print(dir(string))
def generate_function(input_list, output_list, function_name, is_list_output=False):
# check if the lists have the same length
if len(input_list) != len(output_list):
raise ValueError('Inconsistent list sizes')
# get the type of the variables
input_type = get_input_type(input_list)
output_type = get_input_type(output_list)
# check if the output is a list
if is_list_output:
output_type = 'list[' + output_type + ']'
# create the function
func = (function_name + '(' + ', '.join(input_type) + ') -> ' + output_type + ':\n')
# add the statements
for i in range(len(input_list)):
func += (' if ' + get_if_statement(input_list[i]) + ':\n')
func += (' return ' + get_if_return(output_list[i], output_
import ffmpeg
stream = ffmpeg.input('https://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8')
audio = ffmpeg.output(stream, '-c:a aac -b:a 128k myaudio.m4a')
video = ffmpeg.output(stream, '-c:v h264 -b:v 1M myvideo.mp4')
ffmpeg.run(audio, video)
def stream_m3u8_to_rtmp(m3u8_url, output_rtmp_url):
from pym3u8 import Loader, dump
stream = Loader(m3u8_url).streams[0]
stream.dump(output_rtmp_url)
import os
def createFile(filename, rollno, name):
try:
file = open(filename, "wb")
file.write(str(rollno) + "," + name + "\n")
file.close()
print("File created successfully")
except IOError as e:
print("Error in file creation: " + str(e))
def searchFile(filename, rollno):
try:
file = open(filename, "rb")
for line in file:
roll, name = line.decode().split(",")
if roll == str(rollno):
print("Name: " + name)
break
else:
print("Not Found")
file.close()
except IOError as e:
print("Error in file read: " + str(e))
def main():
filename = input("Enter filename: ")
ch = 0
while ch != 3:
print("1. Create File")
print("2. Search File")
print("3
def add(a, b):
return a + b
# fix the above invalid code
def add(a, b):
return a + b
def divide(a, b):
return a / b
print(add(1, 2))
print(divide(2, 1))
def stream_to_rtmp(m3u8_path, rtmp_path):
from pyffmpeg import FFMPEG
ff = FFMPEG(m3u8_path)
ff.cmd.extend(['-f', 'flv', '-vcodec', 'copy', '-acodec', 'copy', '-copyts', '-copytb', '0', rtmp_path])
ff.run()
stream_to_rtmp('http://m3u8.com/m3u8.m3u8', 'rtmp://rtmp.com/app/stream')
is the same as
import subprocess
import re
import os
import time
import requests
# parse the m3u8 url to get the urls of all the ts segments
def parse_m3u8_url(url):
r = requests.get(url)
m3u8_urls = re.findall(r'\n(http[s]?://.+\.ts)', r.text)
ffmpeg_cmd = 'ffmpeg -i "concat:'
for url in m3u8_urls:
ffmpeg_cmd = ffmpeg_cmd + url + '|'
return ffmpeg_cmd[:-1] + '" -acodec copy -vcodec copy -absf aac_adtstoasc output.mp4'
def download(cmd):
# process the command in linux terminal and return the output
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
output = process.communicate()
return output
def run(url):
def merge_sort(arr):
if len(arr) > 1:
mid = len(arr) // 2
L = arr[:mid]
R = arr[mid:]
merge_sort(L)
merge_sort(R)
i = j = k = 0
while i < len(L) and j < len(R):
if L[i] < R[j]:
arr[k] = L[i]
i += 1
else:
arr[k] = R[j]
j += 1
k += 1
while i < len(L):
arr[k] = L[i]
i += 1
k += 1
while j < len(R):
arr[k] = R[j]
j += 1
k += 1
import pym3u8
import requests
import ffmpeg
import os
url = "file:///home/prajwala/Downloads/hls/master.m3u8"
os.environ["http_proxy"] = "http://127.0.0.1:8080"
os.environ["https_proxy"] = "http://127.0.0.1:8080"
m3u8 = pym3u8.load(url)
for playlist in m3u8.playlists:
print(playlist.absolute_uri)
stream = ffmpeg.input(str(playlist.absolute_uri))
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/[stream-key]', rtmp_live='live', rtmp_conn='S:123456', rtmp_app='rtmp://a.rtmp.youtube.com/live2/[stream-key]')
ffmpeg.run(stream)
import sys
import os
import subprocess
import shlex
import time
import re
import random
import string
import urllib2
import urllib
import base64
import json
import datetime, time
import requests
import getopt
import logging
import logging
import logging.handlers
#python read_m3u8.py --input_file=input.txt --input_url=input_url.txt
def read_m3u8(in_file):
f = open(in_file, 'r')
while True:
line = f.readline()
if not line:
break
#print(line.strip())
return line.strip()
f.close()
def main(argv):
input_file = ''
input_url = ''
try:
opts, args = getopt.getopt(argv, "hi:u:", ["help", "input_file=", "input_url="])
except getopt.GetoptError:
print 'read_
import subprocess
subprocess.call([
"ffmpeg",
"-i",
"http://183.182.84.51:1935/live/9CE4475AF8/playlist.m3u8",
"-c",
"copy",
"-f",
"flv",
"rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx"
])
import urllib
import urllib2
values = {'q' : 'python programming tutorials'}
data = urllib.urlencode(values)
url = 'http://www.google.com/search?' + data
req = urllib2.Request(url)
resp = urllib2.urlopen(req)
respData = resp.read()
saveFile = open('withHeaders.txt','w')
saveFile.write(str(respData))
saveFile.close()
import moviepy.editor as mp
def stream(url):
clip = mp.VideoFileClip(url)
clip.write_videofile("stream.mp4")
stream('https://content.jwplatform.com/manifests/yp34SRmf.m3u8')
import streamer
import ffmpeg
stream = streamer.Streamer('https://xxxxxxxxxx.m3u8')
stream.set_ffmpeg(ffmpeg.FFMpeg(ffmpeg.FFProbe))
stream.start()
def add(a, b):
return a + b
add(1, 2)
print(str.center(“hello”, 50, “*”))
from pyffmpeg import FFmpeg
ff = FFmpeg(
inputs={'input1.mp3': '-re -i'},
outputs={'rtmp://localhost/live/myStream': '-c:v copy -c:a aac -f flv'}
)
ff.run()
from moviepy.editor import VideoFileClip
import argparse
import subprocess
def m3u8_to_mp4(m3u8_url, mp4_name):
subprocess.call(["ffmpeg", "-i", m3u8_url, "-c", "copy", mp4_name])
def main():
parser = argparse.ArgumentParser(
description='Get a m3u8 url from a provider and download it as mp4'
)
parser.add_argument(
'm3u8_url',
help='m3u8 url from provider'
)
parser.add_argument(
'mp4_name',
help='name for the mp4 file'
)
m3u8_to_mp4()
if __name__ == '__main__':
main()
def say_hello():
return 'hello'
say_hello()
# Hello World Program
print("hello")
# Hello World Program
print("hello")
# Hello World Program
print("hello")
# Hello World Program
print("hello")
# Hello World Program
print("hello")
# Hello World Program
print("hello")
# Hello World Program
print("hello")
import subprocess
m3u8_url = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
rtmp_url = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
cmd = 'ffmpeg -re -i "' + m3u8_url + '" -vcodec copy -acodec aac -strict -2 -f flv "' + rtmp_url +'"'
p = subprocess.Popen(cmd, shell=True)
#install module
#python3 -m pip install python-ffmpeg-video-streaming
#import module
from python_ffmpeg_video_streaming import Input, Output, Stream
#create inputs and outputs
input1 = Input('http://liveserver.com/video.m3u8')
output = Output('rtmp://streamserver', Stream(input1))
#stream with python-ffmpeg-video-streaming
output.run()
import youtube_dl
import ffmpeg
import sys
import os
url = 'http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8'
ydl_opts = {}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# os.system('ffmpeg -i "' + str(url) + '" -codec copy ' + str(sys.argv[1])+'.mp3 ')
import pafy
url = 'http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8'
video = pafy.new(url)
bestaudio = video.getbestaudio()
bestaudio.download()
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLSStream
stream = HLSStream(
ffmpeg_path=r'C:\Users\Nancy\AppData\Local\Programs\Python\Python36\Scripts\ffmpeg.exe',
input_path='https://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8',
output=r'C:\Users\Nancy\Desktop\ffmpeg_streaming-master\output.mp4',
format='mp4')
stream.stream()
def generate_flowchart(code):
#some code here
return flowchart
generate_flowchart("a = a + b")
def print_statement():
statement = "Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse. The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed."
statement = statement.replace("The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed.", "")
print(statement)
print_statement()
def add(a, b):
return a + b
add(1, 2)
The function add takes two arguments a and b and returns the sum of a and b.
def stream(url):
"""
Program to stream m3u8 url to youtube in python
"""
import subprocess
from pytube import YouTube
from moviepy.editor import VideoFileClip
# Download video from internet
yt = YouTube(url)
yt.streams.first().download()
# Convert video to mp4
video = VideoFileClip(yt.title)
video.write_videofile("video.mp4")
# Upload video to youtube
subprocess.call("youtube-upload --title=\"{}\" --client-secrets secrets.json --private video.mp4".format(yt.title), shell=True)
# Delete video file
subprocess.call("rm video.mp4", shell=True)
stream("https://www.youtube.com/watch?v=S0LU-vbF1tk")
import os
from moviepy.editor import VideoFileClip
stream_url = "http://video.cdn.uk.com/480/20180708/3640/6693/6693-480.m3u8"
clip = (VideoFileClip(stream_url)).subclip(0, 100)
clip.write_videofile("/home/ubuntu/vids/myvideo.mp4")
import videostream
##trying to stream m3u8 url to rtmp
rtmp_url = "rtmp://rtmp.example.com/events/stream"
m3u8_url = "https://path/to/m3u8/file"
vs = videostream.VideoStream(rtmp_url)
vs.start()
vs.send(m3u8_url)
def print_centralized_words(words):
# TODO: write your code here
pass
The function prints out a random password of a length specified by the user.
import shutil
def line():
cols, _ = shutil.get_terminal_size()
return "=" * cols
line()
from string import ascii_letters, digits, punctuation
print(string)
import time
import sys
import os
import subprocess
import signal
from rtmpy import RTMP
def handler(signum, frame):
print "Stop streaming"
sys.exit(0)
signal.signal(signal.SIGINT, handler)
# check for correct number of arguments
if len(sys.argv) != 3:
print 'Usage: python stream.py add rtmp://your_server_address/live/your_stream_key'
sys.exit(1)
stream = RTMP(sys.argv[1], out=sys.argv[2])
stream.start()
while True:
time.sleep(1)
The above script generates a random password. It takes the length from the user and generates the length password using the ascii values.
import csv
def create_csv_file():
csv_columns = ['user id', 'password']
try:
with open('user_pass.csv', 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
user_id = input('Enter user id: ')
password = input('Enter password: ')
writer.writerow({'user id': user_id, 'password': password})
except IOError:
print("I/O error")
import youtube_dl
ydl_opts = {
'format': 'bestvideo+bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download(['https://www.youtube.com/watch?v=BaW_jenozKc'])
from streamz import Stream
import subprocess
def start_stream(url, rtmp_url):
command = f'ffmpeg -re -i {url} -c copy -f flv {rtmp_url}'
process = subprocess.Popen(command.split(), stdout=subprocess.PIPE)
for line in process.stdout:
print(line)
source_stream = Stream()
source_stream.map(
start_stream,
'http://test.m3u8',
'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx')
source_stream.sink(lambda _: print("We're done!"))
def ffmpeg(URL):
stream = ffmpeg.input(URL)
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx', format='flv', vcodec="copy", acodec="aac", ac="2", ar="44100", ab="128k", strict="-2", flags="+global_header", bufsize="3000k")
ffmpeg.run(stream)
# get it from user input
URL = input("Enter here: /n")
ffmpeg(URL)
def live_to_youtube(url, api_key, channel_id):
r = requests.get(url, stream=True)
r.raw.decode_content = True
filename = 'tempfile.ts'
l = 0
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
l += 1
if l >= 60:
live_stream(filename, api_key, channel_id)
l = 0
f.close()
def add(a, b):
return a + b
#include <stdio.h>
#include <stdlib.h>
int main()
{
printf("Hello world!\n");
return 0;
}
import shutil
def output_line():
return print('=' * shutil.get_terminal_size().columns)
while True:
constants = ascii_letters + digits + punctuation
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
if input("Want another? ").lower() == "no":
break
print ("Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse.")
def print_center(word):
print ' ' * ((80 - len(word))/2) + word
print_center('Test')
import pyffmpeg
stream = pyffmpeg.VideoStream()
stream.open('http://sample.vodobox.net/skate_phantom_flex_4k/skate_phantom_flex_4k.m3u8', is_input = True)
stream.open('rtmp://a.rtmp.youtube.com/live2/6qg3-fj7w-xjxh-z6b5', is_output = True)
stream.start()
import videostream
import subprocess
def stream(url, rtmp):
stream = videostream.VideoStream(url)
stream.open()
stream.stream(rtmp)
print ('Streaming to rtmp')
#Streaming to rtmp
stream('https://my-domain.com/playlist.m3u8', 'rtmp://my-domain.com/live/stream')
from moviepy.editor import VideoFileClip
# Replace with the actual M3U8 URL
url = "https://xxxxxxx.m3u8"
# Open the video file
clip = VideoFileClip(url)
# Set the codecs and other options
clip = clip.set_fps(30).set_audio_fps(44100).set_audio_bitrate(128)
# Write the video to file
clip.write_videofile("output.flv", codec='libx264', audio_codec='aac')
def addStudent():
name = input('Student Name: ')
roll = input('Student Roll: ')
cls = input('Student Class: ')
phone = input('Student Phone: ')
# add code to add the student data to mysql database
return 'Student Added'
addStudent()
r = requests.get('https://vod-api.streaming.media.azure.net/api/v2/assets/8bdbcb55-7cce-4a5b-9f10-c3f3a20c2857/locators/8a3a3b3f-d640-4b7f-9b42-77f37d78c4e1/assetfiles/2d33b49f-bfe4-4e0a-a4b4-ad3dd54bdf8a/media/manifest(format=m3u8-aapl)', headers={'Authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IlhHZU5rZU5VZU5pcE9TR0VUQzZoQk5QQSJ9.eyJhdWQiOiJodHRwczovL3ZvZC1hcGkuc3Ry
def print(statement):
return statement
print("hello world")
import ffmpeg
from ffmpeg._run import run_piped, run_async
async def stream(source, stream_key):
stream_server = 'rtmp://a.rtmp.youtube.com/live2'
await run_async(
[
ffmpeg.input(source),
ffmpeg.output(
stream_server,
**{
'c:v': 'copy',
'c:a': 'copy',
'f': 'flv',
'stream_key': stream_key
}
)
]
)
print(message)
import ffmpeg_streaming
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
url = 'https://XXX.m3u8'
stream = ffmpeg_streaming.input(url)
stream \
.output('rtmp://a.rtmp.youtube.com/live2/XXXXX-XXXX-XXXX-XXXX',
format='flv',
vcodec='h264',
vbitrate=Bitrate(1200, 1200, 'k'),
acodec='aac',
abitrate=Bitrate(160, 160, 'k'),
aspect='16:9',
strict='experimental',
) \
.run()
import subprocess
subprocess.Popen(["rtmpy", "--in", "rtmp://localhost/live", "--out", "rtmp://a.rtmp.youtube.com/live2/STREAM_KEY"])
def full_line(title):
print('=' * 80)
print(title.center(80))
print('=' * 80)
full_line('This is a test')
import moviepy.editor as mp
clip = mp.VideoFileClip("http://file-examples.com/wp-content/uploads/2017/04/file_example_MP4_1280_10MG.mp4")
clip.write_videofile("video.mp4")
def stream_m3u8(m3u8_link, rtmp_link):
ff = pyffmpeg.FFMpeg()
input = ff.input(m3u8_link)
output = ff.output(rtmp_link, vcodec='copy', acodec='copy', vsync='cfr', r='30', format='flv', f='flv')
input.output(output, end=0, acodec='copy', vcodec='copy').run()
print("Hello")
user_input = input("what is your name")
print("Hello," + user_input)
# to convert the input to int
a = int(input("give me a number"))
b = int(input("give me another number"))
print(a + b)
# calling a function
a = 5
b = 4
def add(a, b):
return a + b
result = add(a, b)
print(result)
# you can also do this
print(add(a, b))
# putting this to use
def add(a, b):
return a + b
def subtract(a, b):
return a - b
def multiply(a, b):
return a * b
def divide(a, b):
return a / b
print(add(1, 2))
print(subtract(1, 2))
print(multiply(1, 2))
print(divide(1, 2))
from subprocess import Popen, PIPE
def youtube_stream(url, streamKey):
ffmpeg = ['ffmpeg', '-i', url,
'-vcodec', 'copy', '-acodec', 'aac', '-ar', '44100', '-f', 'flv', streamKey]
p = Popen(ffmpeg, stdout=PIPE, stderr=PIPE)
while True:
output = p.stderr.readline()
if output == '' and p.poll() is not None:
break
if output:
print(output.strip().decode('utf-8'))
from rtmpy import Server
Server()
.add_stream(
url='http://domain.com/live/stream.m3u8',
name='stream',
output='rtmp://a.rtmp.youtube.com/live2/[key]')
.serve_forever()
a = '''this is a multi-line string.
this is the second line.'''
print(a)
fix invalid codeSun, 11 Dec 2022 def add(a, b):
return a + b
def full_line(sign):
print(sign * terminal_width)
#code
def print_centered(word):
print(word.center(50))
print_centered("Hello")
def full_line(a):
return a * "="
full_line(5)
import os
import moviepy.editor as mp
import argparse
import apiclient.discovery
import httplib2
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import argparser, run_flow
def youtube_upload(mp4file, title, description, keywords):
# Explicitly tell the underlying HTTP transport library not to retry, since
# we are handling retry logic ourselves.
httplib2.RETRIES = 1
# Maximum number of times to retry before giving up.
MAX_RETRIES = 10
# Always retry when these exceptions are raised.
RETRIABLE_EXCEPTIONS = (httplib2.HttpLib2Error, IOError, httplib2.ServerNotFoundError)
# Always retry when an apiclient.errors.HttpError with one of these status
# codes is raised.
RETRIABLE_STATUS_CODES =
def stream(url, rtmp):
import time, uuid, urllib, os, signal, sys, requests, json, subprocess
session = str(uuid.uuid1())
while True:
response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36 OPR/67.0.3575.53'})
ts = list(map(lambda x: x.split('\n')[0], response.text.split('#EXTINF')))[1:]
ts = list(map(lambda x: x.split(',')[1], ts))
ts = list(map(lambda x: 'http://' + url.split('/')[2] + x, ts))
ts = list(map(lambda x: x.split('?')[0], ts))
with open('/tmp/{
def live_stream_m3u8_to_rtmp():
format = "rtmp://%s/%s/%s" % (
live_stream_server_info['ip'],
live_stream_server_info['app'],
live_stream_server_info['stream_name']
)
chunked_encoder = PyChunkedEncode(
format,
live_stream_server_info['username'],
live_stream_server_info['password'],
live_stream_server_info['key'],
live_stream_server_info['log_level'])
chunked_encoder.start()
chunked_encoder.join()
from subprocess import Popen, PIPE
import requests
import os
def hls2rtmp(hls_url, rtmp_url):
"""
hls_url: https://baidu.com/hls/stream.m3u8
rtmp_url:rtmp://baidu.com/live/stream
"""
rtmpdump_cmd = [
"rtmpdump",
"-r", hls_url,
"-o",
"- | ffmpeg -i - -c copy -f flv " + rtmp_url
]
p1 = Popen(rtmpdump_cmd, stdout=PIPE)
p2 = Popen(["ffmpeg", "-i", "-", "-c", "copy", "-f", "flv", rtmp_url], stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()
output = p2.communicate()[0]
hls2rtmp("https://example
def live_youtube_push(filename, title, description):
"""
:param filename: input file
:param title: video title
:param description: video description
:return: None
"""
# Specify the path to the ffmpeg binary.
ffmpeg_path = "ffmpeg"
# Create a new PyChunkedEncode object.
video_stream = PyChunkedEncode(ffmpeg_path)
# Specify the stream URL.
#url = "https://a.rtmp.youtube.com/live2/{}".format(youtube_key)
url = "rtmp://a.rtmp.youtube.com/live2/{}".format(youtube_key)
# Set the stream URL.
video_stream.set_url(url)
# Specify the stream resolution.
#video_stream.set_resolution(640, 360)
# Specify the stream framerate.
video_stream.set_framerate(60)
# Specify the stream bitrate.
def remove_a(filename, new_file):
with open(filename, 'r') as file:
f1 = file.readlines()
with open(new_file, 'w') as file1:
for line in f1:
if 'a' not in line:
file1.write(line)
remove_a('file.txt', 'file1.txt')
import subprocess
import time
while True:
process = subprocess.Popen('ffmpeg -i "http://streaming.mrt.ac.lk/live/tv1/playlist.m3u8" -c copy -f flv "rtmp://xxxxxxxxxx"', shell=True, stdout=subprocess.PIPE)
time.sleep(1)
def stream(m3u8_url, rtmp_url):
command = [
'ffmpeg',
'-i',
m3u8_url,
'-c',
'copy',
'-f',
'flv',
rtmp_url
]
subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def add(a, b):
return a + b
import json
import time
import datetime
import subprocess
import sys
import pym3u8
import os
def stream_to_rtmp(url, rtmp_url, duration = 0):
if not os.path.isdir("logs"):
os.makedirs("logs")
# print(url)
sys.stdout.write("{}:\tStarted: {}\n".format(rtmp_url, datetime.datetime.now()))
sys.stdout.write("{}:\tStream: {}\n".format(rtmp_url, url))
process = subprocess.Popen(['ffmpeg',
'-i', url,
'-acodec', 'copy',
'-vcodec', 'copy',
'-f', 'flv', rtmp_url], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
log_file = open("logs/{}.txt".format(rtmp_url), '
def fill_line(s):
print(s*80)
fill_line("=")
import string
def print_all(a):
print(a)
return a
print_all(string.ascii_letters)
print_all(string.ascii_lowercase)
print_all(string.ascii_uppercase)
print_all(string.digits)
print_all(string.hexdigits)
print_all(string.whitespace)
print_all(string.punctuation)
"""
"""
'''
'''
# String concatenation
a = 'hello'
b = 'world'
c = a + b
print(c)
a = 'hello'
b = 'world'
c = a + ' ' + b
print(c)
# String casting
a = 10
b = '20'
c = a + int(b)
print(c)
# Specify data type
a = 10
b = '20'
c = a + int(b)
print(type(c))
print(c)
# Format
a = 10
b = '20'
c = '{0} + {1} = {2}'.format(a, b, a + int(b))
print(c)
def fullLine():
output.setSize(3)
print('=' * output.getWidth())
fullLine()
CREATE TABLE program(
number_of_lines INT NOT NULL,
language VARCHAR(50) NOT NULL,
id INT NOT NULL PRIMARY KEY,
description VARCHAR(100) NOT NULL,
);
fix invalid codeSun, 11 Dec 2022
'''
def alter_table(cursor,table_name,column_name,data_type):
query = "ALTER TABLE "+table_name+" ADD "+column_name+" "+data_type+";"
cursor.execute(query)
connection.commit()
print("Table altered successfully")
alter_table(cursor,"student","enrollment", "int(10)")
def update_table(cursor,table_name,column_name,value,condition):
query = "UPDATE "+table_name+" SET "+column_name+" = '"+value+"' WHERE "+condition+";"
cursor.execute(query)
connection.commit()
print("Table updated successfully")
update_table(cursor,"student","enrollment","123456789","name = 'charlie'")
def delete_table(cursor,table_name,condition):
query = "DELETE FROM "+table_name+" WHERE "+condition+";"
cursor.execute(query)
connection.commit()
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
video_stream = ffmpeg_streaming.input('https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8')
audio_stream = video_stream.audio
video_stream = video_stream.video
video_stream = video_stream.filter('scale', Size(1280, 720), force_original_aspect_ratio='increase')
video_stream = video_stream.filter('pad', Size(1280, 720), Color(0, 0, 0))
video_stream = video_stream.output('test.mp4', format=Formats.mp4, vcodec='libx264', acodec='aac')
video_stream.run()
def download(link):
ydl_opts = {}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([link])
download("https://www.youtube.com/watch?v=Nq2wYlWFucg")
translateTue, 03 Jan 2023 def add(a, b):
return a + b
import shutil
def fullline():
columns = shutil.get_terminal_size().columns
print("="*columns)
def stream_m3u8(url, rtmp):
stream = pyRTMP.RTMP(rtmp)
stream.open()
m3u8 = requests.get(url, headers=headers)
m3u8_text = m3u8.text
for line in m3u8_text.splitlines():
if line.startswith("#EXTINF:"):
ts_url = base_url + line.split(",")[1].strip()
ts_data = requests.get(ts_url, headers=headers)
stream.write(ts_data.content)
print("wrote: " + ts_url)
#importing pandas library
import pandas as pd
#creating a dataframe
df = pd.DataFrame({"Roll No.": [1,2,3,4,5,6,7,8,9,10], "Name": ["A","B","C","D","E","F","G","H","I","J"],"Marks":[65,75,85,84,93,95,80,88,78,86]})
#saving dataframe to csv file
df.to_csv("data.csv")
#opening dataframe from csv file
df2 = pd.read_csv("data.csv")
#updating dataframe
df2 = df2.append({'Roll No.':4, 'Name':'D', 'Marks':85}, ignore_index=True)
#saving dataframe to csv file
df2.to_csv("data.csv")
def stream_url(url):
# url = 'm3u8 url'
# url = 'https://videos3.earthcam.com/fecnetwork/10785.flv/playlist.m3u8'
s = FLVStream(url)
s.open()
rtmp = 'rtmp://xxxxxxxxxxxx'
s.publish(rtmp, 'live')
s.pipe.run()
s.close()
stream_url(url)
import moviepy.editor as mp
def stream_video_from_m3u8_to_rtmp_server(m3u8_url, rtmp_server, rtmp_stream_key):
# Open the video file
clip = mp.VideoFileClip(m3u8_url)
# Set the codecs and other options
clip = clip.set_fps(30).set_audio_fps(44100).set_audio_bitrate(128)
# Write the video to file
clip.write_videofile(rtmp_server, codec='libx264', audio_codec='aac')
def password_generator(password_length):
from string import *
constants = ascii_letters + digits + punctuation
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
password_length = int(input("What's the length of the password? "))
password_generator(password_length)
def run_again():
while True:
again = input("Do you want to run again? ")
if again == "yes":
return True
elif again == "no":
return False
#!/usr/bin/env python
from pym3u8 import load
from pysrt import SubRipFile
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from datetime import datetime
import time
import re
import os
m3u8_url = 'http://160.111.144.40:1935/live/stream/playlist.m3u8'
rtmp_url = 'rtmp://localhost/live'
key = 'Q4B4Cw6n'
def run_process(cmd):
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = p.communicate()
return out, err
def get_subtitles(subtitles_url):
subtitles_url = subtitles_url.replace('\n', '').replace('\r', '')
if subtitles_url:
try:
response = urllib2.urlopen(subtitles_url)
def stream(url):
import os
t=os.popen("avconv -re -i "+url+" -ac 2 -ar 44100 -vcodec libx264 -vb 20M -c:a aac -strict experimental -f flv rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_CODE").read()
return t
stream("http://my.url.com/my.m3u8")
The above code randomly generates a password with the length you want.
import shutil
def print_centered(text):
screen_width = shutil.get_terminal_size().columns
text_width = len(text)
left_margin = (screen_width - text_width) // 2
print(' ' * left_margin, text)
print_centered('Hello, World!')
import time
import datetime as dt
import numpy as np
import pandas as pd
import streamz
import streamz.dataframe
import bokeh.models as bm
import bokeh.plotting as bp
from tornado.ioloop import IOLoop
import tornado.web
import requests
import pandas as pd
# import data
df = pd.read_csv('../data/pandas_data.csv',
parse_dates=['datetime'],
index_col=['datetime'])
# create a streamz dataframe
source = streamz.dataframe.DataFrame(streaming=True, example=df)
# create a buffer of the dataframe
buffer = source.buffer(10, dropna=True)
# create a rolling mean
rolling_mean = buffer.rolling(12).mean()
# create a rolling standard deviation
rolling_std = buffer.rolling(12).std()
# calculate the upper and lower bounds of the Bollinger Bands
upper_bound = rolling
print("Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse."
fix invalid codeSun, 11 Dec 2022
import os
from subprocess import Popen, PIPE
from threading import Thread
from flask import Flask, send_from_directory, Response
from flvstreamer import FLVStreamer
app = Flask(__name__)
def stream_thread(url):
flv = FLVStreamer(url)
flv.start()
@app.route('/')
def stream():
url = 'http://' + os.getenv("URL")
thread = Thread(target=stream_thread, args=(url,))
thread.start()
return Response(flv.stream(), mimetype='text/event-stream')
def add(a, b):
return a + b
add(1, 2)
def youtube_stream(url):
command = "ffmpeg -user_agent 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36' -i {0} -vcodec copy -acodec copy -f flv rtmp://a.rtmp.youtube.com/live2/{1}".format(url, YOUTUBE_CHANNEL_ID)
subprocess.call(command, shell=True)
import youtube_dl
ydl_opts = {
'format': 'mp4',
'no-playlist': True,
'quiet': True,
'no-warnings': True
}
def tube(video_id):
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download(['https://www.youtube.com/watch?v={}'.format(video_id)])
return
from python_ffmpeg_video_streaming import Streaming
url = "https://...."
streamer = Streaming(url)
streamer.set_youtube_key("Youtube_API_Key")
streamer.start()
import ffmpeg
from ffmpeg import *
def stream_m3u8_to_rtmp(m3u8_url, rtmp_url):
ff = FFmpeg(
inputs={m3u8_url: None},
outputs={rtmp_url: None}
)
ff.run()
stream_m3u8_to_rtmp('http://192.168.0.23/hls/test.m3u8', 'rtmp://localhost/live/test')
def m3u8_to_rtmp(m3u8, rtmp):
ts_list = parse_m3u8_ts(m3u8)
if ts_list is None:
print('Failed to parse m3u8 file')
return False
# get stream from ts file
ts_stream = open_ts_stream(ts_list[0])
# create rtmp stream
rtmp_stream = create_rtmp_stream(rtmp)
# continue stream ts to rtmp
stream_ts_to_rtmp(ts_stream, rtmp_stream)
def streamM3U8(url, host="rtmp://127.0.0.1:1935/live"):
req = urllib.request.Request(url)
res = urllib.request.urlopen(req)
res2 = res.read().decode('utf-8')
res.close()
#print(res2)
patt = re.compile('(^.*?\n)')
res3 = patt.findall(res2)
tsUrl = res3[-1].strip()
#print(tsUrl)
#tsUrl = "http://192.168.1.4/live/test.ts"
return streamTS(tsUrl, host)
def streamTS(url, host = "rtmp://127.0.0.1:1935/live"):
size = 1024
stream = urllib.request.urlopen(url)
#print(stream.info())
#print(stream.getcode())
#print(stream.get
The sum of values in the list
import random
def dice():
return random.randint(1, 6)
import ffmpy
import subprocess
import shlex
import sys
import os
def execute(cmd):
""" Function to execute a system command and return the output.
"""
args = shlex.split(cmd)
try:
output = subprocess.check_output(args, stderr=subprocess.STDOUT).decode("utf-8")
return output
except:
return None
if __name__ == '__main__':
# URL to the m3u8 file
url = 'http://playertest.longtailvideo.com/adaptive/bbbfull/bbbfull.m3u8'
# output RTMP URL
output = 'rtmp://YOUR_RTMP_URL'
# command to fetch key
cmd = 'ffprobe -v quiet -print_format json -show_format -show_streams ' + url
output = execute(cmd)
if output is not None:
key = None
# if key is present, fetch it
if '#EXT
def password_generator():
import random
from string import *
constants = ascii_letters + digits + punctuation
print(constants)
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
from pyyoutube import Api
from pyyoutube.utils.arguments import parse_cli_args
from pyyoutube.utils.stream import streaming_from_m3u8
from pyyoutube import Api
from ffmpeg import FFMpeg
from ffmpeg.utils.arguments import parse_cli_args
from ffmpeg.utils.stream import streaming_from_m3u8
from ffmpeg.services.youtube import Youtube
from pyyoutube.utils.ffmpeg import FFMpeg
from pyyoutube.services.youtube import Youtube
api_key = "AIzaSyBk-GZ1EwZiCKDYnI2YKXyX9o6PnL-WpKM"
api = Api(api_key=api_key)
def stream():
while True:
try:
stream_data = StreamData(url=url)
stream_data.get_m3u8_data()
stream_data.get_ts_data()
stream_data.stream_to_rtmp()
except Exception as e:
print("ERROR: %s" % e)
time.sleep(5)
import random
from string import *
constants = ascii_letters + digits + punctuation
print(constants)
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
import string
print(string.ascii_letters + string.digits + string.punctuation)
import random
def roll_dice(n):
return random.randint(1, 6)
roll_dice(1)
import shutil
def line(a):
return shutil.get_terminal_size().columns * a
line("=")
def print_line(char="="):
print(char * 80)
print_line()
print_line("*")
import string
def get_all_chars():
all_chars = ''
for char1 in string.ascii_letters:
all_chars += char1
for char2 in string.digits:
all_chars += char2
for char3 in string.punctuation:
all_chars += char3
return all_chars
print(get_all_chars())
def print_centered(phrase):
print(phrase.center(50))
CREATE PROGRAM 'school_management_program'
LANGUAGE SQL
SQL SECURITY CREATOR
NOT DETERMINISTIC
CONTAINS SQL
SET OPTION
EXTERNAL NAME 'school_management_program'
while True:
choice = input("Would you like to restart the program?")
if choice == 'yes':
continue
elif choice == 'no':
print('Thank you for using this program!')
break
else:
print('Please enter yes or no')
continue
import ffmpeg
stream = ffmpeg.input('http://ip/live/ch1.m3u8')
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/YOUR_KEY')
ffmpeg.run(stream)
def full_line():
print("="*shutil.get_terminal_size().columns)
from string import *
constants = ascii_letters + digits + punctuation
print(constants)
import ffmpeg_streaming
import ffmpeg
stream = ffmpeg_streaming.Stream(url="http://st-kv-vod.akamaized.net/b542d8a4f38a4e6e8ff0a9c9d83aa7b4/4a2f48d3/stv2/index.m3u8")
file_name = "big_buck_bunny_720p_10mb.mp4"
with open(file_name, 'wb') as f:
for data in stream.iter_content():
f.write(data)
input = ffmpeg.input(file_name)
input = ffmpeg.output(file_name)
input.run()
import string
print(string.ascii_letters + string.digits + string.punctuation)
def stream_to_youtube(m3u8_url, title, description, category, tags, output_path=None):
"""
:param m3u8_url: url of m3u8 file
:param title: video title
:param description: video description
:param category: video category
:param tags: video tags (list)
:param output_path: path for the output file (default=None)
:return: output_filename (if output_path=None) or True
"""
# modify the avconv command with your own configuration
command = [
"avconv",
"-i", m3u8_url,
"-c:v", "libx264",
"-c:a", "copy",
"-threads", "0",
"-b:v", "1000k",
"-b:a", "64k",
"-r", "25",
"-f", "mp4",
"-movflags", "frag_keyframe+empty_moov",
import requests
import subprocess
import threading
import time
import socket
import urlparse
import sys
import os
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
def get_public_ip():
return requests.get('http://ip.42.pl/raw').text
def get_url_ip(url):
return socket.gethostbyname(urlparse.urlparse(url).hostname)
def get_public_ip_from_url(url):
return get_url_ip(url)
def check_ip_available(ip):
if ip == '' or ip == "127.0.0.1":
return False
else:
return True
def get_ip_from_url(url):
return check_ip_available(get_public_ip_from_url(url))
def stream_video_to_youtube(m3u8_url, title, description, category_id, client_secrets_file, credentials_file):
import moviepy.editor as mp
import os
import subprocess
print("Clip")
clip = mp.VideoFileClip(m3u8_url)
print("Write")
clip.write_videofile("video.mp4")
print("Upload")
subprocess.call(['python', 'upload_video.py', '--file', 'video.mp4', '--title', title, '--description', description, '--category', category_id, '--keywords', '', '--privacyStatus', 'unlisted', '--client_secrets', client_secrets_file, '--credentials', credentials_file])
os.remove("video.mp4")
stream_video_to_youtube("https://s3.amazonaws.com/vod-demo/index.m3u8", "VOD", "Video on demand",
from ffmpy import FFmpeg
def stream(url):
ff = FFmpeg(
inputs={url: None},
outputs={
'rtmp://a.rtmp.youtube.com/live2/x/your_stream_key':
'-c:v libx264 -preset veryfast -maxrate 3000k -bufsize 6000k -pix_fmt yuv420p -g 50 -c:a aac -b:a 160k -ac 2 -ar 44100 -f flv'
}
)
ff.run()
import sqlite3
connection = sqlite3.connect(':memory:')
cursor = connection.cursor()
cursor.execute('CREATE TABLE student(roll_no INTEGER PRIMARY KEY, name TEXT, age INTEGER);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student0", 9);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student1", 10);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student2", 8);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student3", 9);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student4", 8);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student5", 10);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student6", 8);')
cursor.execute('INSERT INTO student(name, age) VALUES ("student7", 9);
def greeting(name: str) -> str:
return 'hello' +name
def stream_m3u8_to_rtmp(m3u8_url, rtmp_url):
Gst.init([])
playbin = Gst.ElementFactory.make("playbin", "playbin")
playbin.set_property('uri', m3u8_url)
playbin.set_property('buffer-size', 20 * 1024 * 1024)
playbin.set_property('buffer-duration', 20 * Gst.SECOND)
playbin.set_property('latency', 20 * Gst.SECOND)
playbin.set_property('connection-speed', 1000)
playbin.set_property('timeout', 1000000000)
playbin.set_property('buffer-mode', 2)
playbin.set_property('ring-buffer-max-size', 10 * 1024 * 1024)
playbin.set_property('max-lateness', -1)
# Add RTMP sink
sink = Gst.ElementFactory.make('rtmpsink', 'sink')
sink.
import moviepy.editor as mp
url = 'https://xxxx.m3u8'
clip = mp.VideoFileClip(url)
clip.write_videofile('output.mp4')
def fillLine(ch, n):
for i in range(n):
print(ch, end="")
print()
fillLine("=", 50)
def printPython():
print("Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse.")
printPython()
fix invalid codeSun, 11 Dec 2022
import subprocess
rtmp_url = 'rtmp://192.168.2.2/live/test'
m3u8_url = 'http://example.com/playlist.m3u8'
cmd = "ffmpeg -i '{0}' -c copy -f flv '{1}'".format(m3u8_url, rtmp_url)
p = subprocess.Popen(cmd, shell=True)
p.wait()
import moviepy.editor as mp
video=mp.VideoFileClip("F:\moviepy.mp4")
video.audio.write_audiofile("audio.mp3")
def stream_m3u8(url, rtmp_url):
stream = ffmpeg.input(url)
stream = ffmpeg.output(stream, rtmp_url, vcodec='copy', acodec='copy', format='flv')
ffmpeg.run(stream)
stream_m3u8('http://....', 'rtmp://....')
def add(a, b):
return a + b
add(1, 2)
from moviepy.editor import *
clip = VideoFileClip('http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4')
clip.write_videofile("test.mp4")
import ffmpeg
from ffmpy3 import FFmpeg
stream = FFmpeg(
inputs={'http://192.168.0.131:8081/hls/live.m3u8': None},
outputs={'rtmp://a.rtmp.youtube.com/live2/x/y': None}
)
stream.run()
def print_centered(word):
print(word.center(30))
print_centered('Python')
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Represent
the function takes a list and an integer and returns a list with the same elements as the list but with the integer as the length of the list
from moviepy.editor import *
def save_video(url, output_file):
clip = VideoFileClip(url)
clip.write_videofile(output_file)
save_video("http://example.com/file.m3u8", "output.mp4")
def add(a, b):
return a + b
add(1, 2)
def ask_user():
user_input = input("Press Q to quit: ").lower()
if user_input == "q":
return False
else:
return True
while ask_user():
print("Looping")
import mysql.connector
from mysql.connector import Error
import os
from subprocess import Popen, PIPE
def backup():
try:
connection = mysql.connector.connect(host='127.0.0.1',
database='my_database',
user='root',
password='password')
if connection.is_connected():
db_Info = connection.get_server_info()
print("Connected to MySQL Server version ", db_Info)
cursor = connection.cursor()
cursor.execute("select database();")
record = cursor.fetchone()
print("Your connected to database: ", record)
pipe = Popen(['/usr/bin/mysqldump', '-u', 'root', '--databases', 'my_database', '--password=password'], stdout=PIPE)
output = pipe.communicate()[0]
print(output)
with open('/home/database_backup.sql', 'wb') as f
import string
string.ascii_letters + string.digits + string.punctuation
def stream(url, title):
if '?type=m3u8_native' in url:
url = url.replace('?type=m3u8_native', '?type=m3u8')
url = url.replace('/manifest.m3u', '/index_1_av.m3u8')
clip = VideoFileClip(url).resize(width=854)
clip.write_videofile(str(title + '.mp4'))
def m3u8_to_rtmp(url, stream_name, local_port=10000, time_interval=5):
print("start streaming m3u8 to rtmp\n")
stream_cmd = "ffmpeg -i %s -c copy -f flv rtmp://localhost:%s/%s" % (url, local_port, stream_name)
print(stream_cmd)
stream_subprocess = subprocess.Popen(stream_cmd, shell=True, stderr=subprocess.STDOUT)
print("streaming pid: %s\n" % stream_subprocess.pid)
print("streaming url: %s\n" % url)
print("streaming stream name: %s\n" % stream_name)
print("streaming local port: %s\n" % local_port)
print("streaming time interval: %s\n" % time_interval)
while True:
time.sleep(time_interval)
if stream_subprocess.poll() is
import subprocess
import shlex
def stream():
src = "http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4"
dst = "rtmp://192.168.0.101:1935/live/stream"
return subprocess.Popen(shlex.split('ffmpeg -re -i "{0}" -c copy -f flv "{1}"'.format(src, dst)))
if __name__ == '__main__':
p = stream()
p.wait()
print(p.returncode)
import streamlink
import subprocess
# M3U8 stream
m3u8_stream_url = "https://www.youtube.com/watch?v=yVUbUYq3wq4"
# RTMP stream
rtmp_stream_url = "rtmp://rtmp.example.com:1935/app/stream_name"
# Get streams
streams = streamlink.streams(m3u8_stream_url)
# Get best quality stream
stream = streams['best']
# Get stream url
stream_url = stream.url
# Create command to stream m3u8 url to rtmp
command = ["ffmpeg", "-i", stream_url, "-c:v", "copy", "-c:a", "aac", "-strict", "-2", "-f", "flv", rtmp_stream_url]
# Stream to rtmp server
subprocess.call(command)
def stream_m3u8_to_rtmp(m3u8_url, rtmp_url):
"""
Stream m3u8 url to rtmp url
"""
base_url = os.path.dirname(m3u8_url)
m3u8 = requests.get(m3u8_url)
m3u8_content = m3u8.text
m3u8_list = m3u8_content.split('\n')
ts_list = []
for item in m3u8_list:
if item.endswith('.ts'):
ts_list.append(item)
ts_list = [base_url + '/' + ts for ts in ts_list]
p = subprocess.Popen(['ffmpeg', '-y',
'-i', 'concat:' + '|'.join(ts_list),
'-c', 'copy',
'-f', 'flv',
'-metadata',
def print_centralized(word):
print(word.center(100))
print_centralized("Hello world")
is shorthand for
def rtmpm3u8(url, rtmp_url):
rtmp_conn = rtmpy.Connection(rtmp_url)
rtmp_conn.connect()
stream_type='live'
rtmp_conn.create_stream(type=stream_type)
m3u8_obj = m3u8.loads(requests.get(url).text)
master_playlist = m3u8.load(url)
for playlist in master_playlist.playlists:
print(playlist.uri)
segment_urls = [urljoin(url, segment.uri) for segment in playlist.segments]
rtmp_conn.publish()
for segment_url in segment_urls:
segment_obj = requests.get(segment_url).content
rtmp_conn.write(segment_obj, timestamp=0)
CREATE TABLE school(
sno INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(10),
age INTEGER,
gender VARCHAR(6)
);
means the same as a = a + b
from ffmpy import FFmpeg
ff = FFmpeg(
inputs={'http://89.238.150.70:7000/live/phdtv/phdtv/1234.ts': None},
outputs={'rtmp://a.rtmp.youtube.com/live2/[youtube_key]': '-c:v libx264 -c:a aac -strict experimental -f flv -vf "scale=trunc(iw/2)*2:trunc(ih/2)*2"'}
)
ff.run()
def line():
return shutil.get_terminal_size().columns * "="
fix invalid codeThu, 29 Dec 2022 ## License
* MIT
## Contribution
If you have any ideas, just [open an issue](https://github.com/peterldowns/simple-python-pygal-chart/issues/new) and tell me what you think.
If you'd like to contribute, please fork the repository and make changes as
you'd like. Pull requests are warmly welcome.
## Donate
* [Become a patron](https://www.patreon.com/pld)
def stream_m3u8_to_rtmp(m3u8_url):
#function that: stream m3u8 url to rtmp in python with pyffmpeg
#https://pypi.org/project/pyffmpeg/
#https://blog.csdn.net/u012989420/article/details/90804562
#https://www.w3cschool.cn/python/python-install-pyffmpeg.html
#https://blog.csdn.net/qq_35378626/article/details/81053800
#https://www.cnblogs.com/kissazi2/p/11205442.html
#https://www.jianshu.com/p/dde6cf812b0b
from pyffmpeg import FFmpeg
from pyffmpeg import Input
from pyffmpeg import Output
from pyffmpeg import Format
from pyffmpeg import AudioCodec
from pyffmpeg import VideoCodec
from pyffmpeg import AVStream
from
def create_project(hotel_name, db_name, db_user, db_password):
os.system("mkdir " + hotel_name)
os.system("cd " + hotel_name)
os.system("python3 -m venv venv")
os.system("source venv/bin/activate")
os.system("pip install django")
os.system("django-admin startproject " + hotel_name)
os.system("cd " + hotel_name)
os.system("python manage.py migrate")
os.system("python manage.py startapp hotel")
os.system("cd hotel")
os.system("touch models.py")
os.system("cd ..")
os.system("python manage.py makemigrations hotel")
os.system("python manage.py migrate")
with open('hotel/models.py', 'w') as f:
f.write("from django.db import models\n\n")
f.write("class Guest(
import ffmpeg
in_file = ffmpeg.input('https://d2zihajmogu5jn.cloudfront.net/bipbop-advanced/bipbop_16x9_variant.m3u8')
out_file = ffmpeg.output(in_file, 'https://www.youtube.com/watch?v=NbVc0iZgK_8')
stack = []
stack.append(1)
stack.append(2)
stack.append(3)
stack.append(4)
print (stack)
print (stack.pop())
print (stack.pop())
print (stack.pop())
print (stack.pop())
from string import ascii_letters, digits, punctuation
from string import *
This code will import the random module and the string module.
Then it will make constants to use in the code.
Then it will ask you how long you want the password to be.
Then it will make a variable that stores the password.
Then it will make a loop that makes the length of the password as long as you want it to be.
Then it will print the password.
# import shutil to get the terminal width
import shutil
# define a function to fill the line
def fill_line():
# get the width of current terminal
width = shutil.get_terminal_size().columns
# fill the line with =
print('=' * width)
# test the function in a new cell
fill_line()
def create_school_database(school_name):
connection = mysql.connect()
cursor = connection.cursor()
school_db_name = school_name.lower().replace(' ', '_')
query = 'CREATE DATABASE IF NOT EXISTS {}'.format(school_db_name)
cursor.execute(query)
connection.commit()
connection.close()
cursor.close()
def stack():
stack = []
return stack
def push(stack, item):
stack.append(item)
def pop(stack):
if len(stack) == 0:
return None
else:
return stack.pop()
def peek(stack):
if len(stack) == 0:
return None
else:
return stack[-1]
stack = stack()
push(stack,1)
push(stack,2)
push(stack,3)
push(stack,4)
print(pop(stack))
def stream(url):
subprocess.Popen(["rtmpdump", "-q", "-r", "rtmp://192.168.1.148/live/live", "-y", "live", "-W", "https://github.com/arut/nginx-rtmp-module/archive/master.tar.gz", "-o", "test.flv"])
stream("http://192.168.1.148/live/live.m3u8")
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
import os
import shutil
def encode(in_file, out_file, height=None, width=None):
# TODO: Implement me
pass
def hls_stream(in_file, out_dir, start_time=None, duration=None):
# TODO: Implement me
pass
def stream_to_youtube(in_file, out_dir, start_time=None, duration=None):
# TODO: Implement me
pass
def stream_video_from_m3u8_url_to_youtube(url, title, description, category, keywords, privacyStatus):
stream = ffmpeg_streaming.input(url)
stream = ffmpeg_streaming.output(stream, title, description, category, keywords, privacyStatus)
ffmpeg_streaming.run(stream)
import random
def passwordGen(length):
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@£$%^&*().,?0123456789'
password = ''
for i in range(length):
password += random.choice(chars)
return password
print(passwordGen(int(input('password length? '))))
from PyChunkedEncode import chunked_to_rtmp
chunked_to_rtmp("http://playertest.longtailvideo.com/adaptive/oceans_aes/oceans_aes.m3u8", "rtmp://rtmp.example.com/app/test")
import subprocess
import requests
import json
import time
import os
def main():
baseUrl = "https://m.facebook.com"
authUrl = baseUrl + "/v2.6/dialog/oauth"
accessUrl = baseUrl + "/v2.6/oauth/access_token"
debugTokenUrl = baseUrl + "/debug_token"
app_id = "app_id"
app_secret = "app_secret"
app_token = app_id + "|" + app_secret
default_timeout = 30
default_redirect_uri = 'https://www.facebook.com/connect/login_success.html'
def get_auth_token(app_token, app_id, permissions, redirect_uri=default_redirect_uri):
url_get_params = {
'client_id': app_id,
'redirect_uri': redirect_uri,
'scope': permissions,
}
r = requests.get(authUrl, params=url_get
The for loop runs as long as the value of i is less than the value of password_length.
import subprocess
url = "https://www.youtube.com/watch?v=H542nLTTbu0"
subprocess.call(["youtube-dl", "-f", "best[ext=mp4]", url])
import sqlite3
conn = sqlite3.connect('student.db')
c = conn.cursor() #create cursor
c.execute('''DROP TABLE IF EXISTS student''')
c.execute('''CREATE TABLE student (
id INTEGER PRIMARY KEY,
name TEXT,
age INTEGER,
marks INTEGER)''')
c.execute('''INSERT INTO student VALUES (1, "Gill", 15, 80)''')
c.execute('''INSERT INTO student VALUES (2, "Ben", 14, 75)''')
c.execute('''INSERT INTO student VALUES (3, "Joe", 14, 65)''')
c.execute('''INSERT INTO student VALUES (4, "Will", 15, 45)''')
c.execute('''INSERT INTO student VALUES (5, "Calvin", 16, 80)''')
c.execute('''INSERT INTO student VALUES (6, "James", 13, 40)''')
c.execute('''
def generate_function(name, required_params, optional_params):
params = required_params
for optional_param in optional_params:
params.append(optional_param + ' = None')
params = ', '.join(params)
function = f'def {name}({params}):\n'
function += f' return ""'
return function
import requests
import subprocess
def m3u8_to_rtmp(m3u8_url,rtmp_url):
subprocess.call(["ffmpeg", "-i", m3u8_url, "-c", "copy", "-bsf:a", "aac_adtstoasc", "-f", "flv", rtmp_url])
m3u8_to_rtmp("m3u8_url","rtmp_url")
def stream_to_rtmp(url):
rtmp = RTMP()
rtmp.connect('rtmp://localhost:1936/live', playpath=url, live=True)
rtmp.write('rtmp://localhost:1936/live', playpath='test', live=True)
rtmp.close()
stream_to_rtmp('url')
from bs4 import BeautifulSoup
import requests
def scrape_function():
html = requests.get("https://www.programming-helper.com/generate-function")
soup = BeautifulSoup(html.content, 'html.parser')
return soup.find('input', {'id':'function-result'}).get('value')
scrape_function()
from pyffmpeg import FFmpeg
INPUT_FILE = "http://10.128.0.30/x/x/x.m3u8"
OUTPUT_FILE = "http://a.rtmp.youtube.com/live2/"
ff = FFmpeg(
inputs={INPUT_FILE: None},
outputs={OUTPUT_FILE: '-f flv'}
)
ff.run()
def stream_to_youtube(m3u8_url, youtube_url, quality, stream_id):
stream = ffmpeg.input(m3u8_url, **{'stimeout': '5000000', 'reconnect': '1', 'reconnect_streamed': '1', 'reconnect_delay_max': '1000', 'reconnect_attempts': '-1'})
stream = ffmpeg.output(stream, youtube_url, vcodec='h264', acodec='aac', vb="%dk" % quality, threads=2, format='flv',
**{'stimeout': '5000000', 'reconnect': '1', 'reconnect_streamed': '1', 'reconnect_delay_max': '1000', 'reconnect_attempts': '-1'})
print('Starting ffmpeg with command:')
print(ffmpeg.compile(stream))
ffmpeg.run(stream, quiet=False, overwrite_output=True)
def print_line():
"print a line of = sign according to the output screen width"
print("=" * shutil.get_terminal_size().columns)
#!/usr/bin/env python
"""
Live streaming of a m3u8 file to rtmp
The file is chunked and read in a loop. When complete it is re-read.
The ffmpeg command can be modified to suit the target RTMP server.
For example:
ffmpeg -re -i 'http://localhost:8000/path/to/file.m3u8' \
-c copy -f flv 'rtmp://a.rtmp.youtube.com/live2/stream_key_from_youtube'
"""
# https://gist.github.com/rca/5c1a539f4a4be4d4c4ff
import os
import sys
import subprocess
import time
import signal
import logging
def get_chunk_list(filename):
return [
filename,
*[f"{filename}-{i}" for i in range(1, 1000)]
]
def get_chunk_timeout(filename):
return int(os.stat(filename).st_
from rtmpy import Server, Stream
server = Server()
stream = Stream(‘output’)
server.publish(‘input’, stream)
stream.play(‘http://localhost:8080/stream.m3u8’, loop=True)
def fullLine():
width, height = shutil.get_terminal_size()
print('=' * width)
ffmpeg -i 'https://example.com/hls/movie.m3u8' -c copy -bsf:a aac_adtstoasc 'output.mp4'
import ffmpeg
(
ffmpeg
.input('https://www.youtube.com/watch?v=K_CoV_L-gOs')
.output('output.mp4')
.run()
)
The code takes the length of the password from the user and then creates a random generator using random.choice, this is then repeated for the password length.
import string
def merge():
print(string.ascii_letters)
print(string.ascii_lowercase)
print(string.ascii_uppercase)
print(string.digits)
print(string.hexdigits)
print(string.whitespace)
print(string.punctuation)
merge()
i is a variable that iterates through range(c), or 0. So, as long as i is less than c, the loop runs. The second line appends a random choice from b to a, so a will contain c characters.
from ffmpy import FFmpeg
input_file = "input.m3u8"
output_file = "output.mp4"
ff = FFmpeg(
inputs={input_file: None},
outputs={output_file: None}
)
ff.run()
def stream_to_youtube(filename):
stream = ffmpeg.input(filename)
stream = ffmpeg.output(stream,
'rtmp://a.rtmp.youtube.com/live2/' + YOUTUBE_STREAM_KEY,
vcodec='copy',
acodec='copy',
format='flv',
strict='experimental',
r=30,
preset='ultrafast',
force_key_frames='expr:gte(t,n_forced*2)')
ffmpeg.run(stream)
def stream(url, key):
r = requests.get(url, stream=True)
stream = Stream(key)
for chunk in r.iter_content(chunk_size=1024):
stream.write(chunk)
import ffmpeg
def stream(URL):
stream = ffmpeg.input(URL)
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx', format='flv', vcodec="copy", acodec="aac", ac="2", ar="44100", ab="128k", strict="-2", flags="+global_header", bufsize="3000k")
ffmpeg.run(stream)
x = input("Enter here: /n")
stream(x)
from youtube_live_stream import youtube_live_stream
stream = youtube_live_stream()
stream.login(username='username', password='password')
stream.start_stream(url=url)
stream.stop_stream()
import random
from string import *
constants = ascii_letters + digits + punctuation
print(constants)
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(constants)
print(password)
ask_user = True
while ask_user:
if input("Run again? ") == "no":
ask_user = False
def stream_to_server(ip, port, app, uri, stream_name):
rtmp = pyRTMP(ip, port, app, uri)
rtmp.connect()
rtmp.stream(stream_name)
rtmp.close()
def line_under(text):
print(text)
print(len(text)*'=')
line_under('string')
import requests
import os
import subprocess
import re
import threading
from datetime import datetime
import time
import sys
from queue import Queue
from threading import Thread
def get_m3u8_url():
url = 'https://www.youtube.com/watch?v=gJN8QyhdxL4'
r = requests.get(url)
print(r.text)
r1 = re.findall(r'"hlsManifestUrl":"(.*?)"', r.text)
return r1[0]
def download_m3u8(m3u8_url):
file_name = 'tmp.m3u8'
print('downloading m3u8...')
os.system('ffmpeg -i ' + m3u8_url + ' -c copy ' + file_name)
print('downloaded m3u8')
return file_name
def get_ts_list(m3u8_file_name):
ts_list = []
def add(a, b):
return a + b
add(1, 2)
import random
def dice():
return random.randint(1,6)
dice()
#add a comment here
import subprocess
import os
def start_streaming(m3u8,rtmp):
ffmpeg_command = "ffmpeg -i {} -acodec copy -vcodec copy -f flv -y {}".format(m3u8,rtmp)
pid = subprocess.Popen(ffmpeg_command, shell=True)
return pid
def stop_streaming(pid):
os.system("kill -9 {}".format(pid.pid))
if __name__ == "__main__":
m3u8_url = "http://127.0.0.1:5000/sample.m3u8"
rtmp_url = "rtmp://127.0.0.1:1935/live/mystream"
p = start_streaming(m3u8_url,rtmp_url)
stop_streaming(p)
import m3u8
import ffmpeg
def stream_m3u8_to_rtmp(m3u8_url, rtmp_url):
m3u8_obj = m3u8.load(m3u8_url)
m3u8_urls = []
for playlist in m3u8_obj.playlists:
m3u8_urls.append(playlist.absolute_uri)
for m3u8_url in m3u8_urls:
print("Streaming: " + m3u8_url)
stream = ffmpeg.input(m3u8_url)
stream = ffmpeg.output(stream, rtmp_url, format="flv", vcodec="copy", acodec="copy", r="30")
ffmpeg.run(stream)
stream_m3u8_to_rtmp("https://video-dev.github.io/streams/x36xhzz/x36xhzz.m3u8", "rtmp://localhost
The code above is a program in Python that generates a password of the length provided by the user. It generates the password by randomly choosing a character from a string of digits, letters, and punctuation marks.
import random
def dice():
return random.randint(1, 6)
def stream_m3u8_to_rtmp(m3u8_url, rtmp_url):
Gst.init(None)
pipeline = Gst.parse_launch("playbin uri={}".format(m3u8_url))
sink = Gst.ElementFactory.make("rtmpsink", "sink")
sink.set_property("location", rtmp_url)
pipeline.set_property("video-sink", sink)
loop = GObject.MainLoop()
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
pipeline.set_state(Gst.State.NULL)
import subprocess
url = 'http://192.168.1.1/hls/test.m3u8'
rtmp_url = 'rtmp://192.168.1.1/live/my_stream'
ffmpeg = "ffmpeg -i '{}' -vcodec copy -acodec copy -f flv '{}'".format(url, rtmp_url)
process = subprocess.Popen(ffmpeg.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
print(output)
import sys
for line in sys.stdin:
if not line.startswith('[youtube]'):
print(line)
the function will run c times and each time it will add a random letter from the list b to the string a
import random
letters = "abcdefghijklmnopqrstuvwxyz"
uppercase_letters = letters.upper()
numbers = "0123456789"
symbols = "!@#$%^&*()_+-=:;?><,.{}[]"
all = letters + uppercase_letters + numbers + symbols
password_length = int(input("What's the length of the password? "))
password = ""
for i in range(password_length):
password += random.choice(all)
print(password)
import youtube_dl
ydl_opts = {}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download(['https://www.youtube.com/watch?v=Nq2wYlWFucg'])
# Create student table
cur.execute("CREATE TABLE IF NOT EXISTS student (name TEXT, studentNumber INTEGER)")
# Insert data
cur.execute("INSERT INTO student VALUES('Bob', 123456)")
cur.execute("INSERT INTO student VALUES('Alice', 234567)")
cur.execute("INSERT INTO student VALUES('John', 345678)")
# Commit
conn.commit()
# Query
cur.execute("SELECT * FROM student")
print(cur.fetchall())
# Delete
cur.execute("DELETE FROM student WHERE name='Bob'")
# Update
cur.execute("UPDATE student SET studentNumber=987654 WHERE name='John'")
# Commit
conn.commit()
# Query
cur.execute("SELECT * FROM student")
print(cur.fetchall())
import ffmpeg_streaming
from ffmpeg_streaming import Formats, Bitrate, Representation
stream = ffmpeg_streaming.input('https://mnmedias.api.telequebec.tv/m3u8/29880.m3u8')
video = stream.video.filter('fps', fps=30, round='up')
video = video.filter('scale', width=1280, height=720)
audio = stream.audio.filter('aformat', channel_layouts='mono')
output = ffmpeg_streaming.output(stream, 'rtmp://a.rtmp.youtube.com/live2/m2k2-ttgv-wyzt-8xnx',
format=Formats.hls,
acodec='aac',
vcodec='libx264',
strict='experimental',
hls_time=10,
hls_list_size=0)
output.run()
import pygame
import os
import glob
def list_all_images(directory):
return [os.path.basename(f) for f in glob.glob(directory + "*")]
def draw_flowchart(draw, flowchart_input_text, flowchart_output_text, flowchart_flow_text):
print(flowchart_input_text)
print(flowchart_output_text)
print(flowchart_flow_text)
flowchart_input_array = flowchart_input_text.split(',')
flowchart_output_array = flowchart_output_text.split(',')
flowchart_flow_array = flowchart_flow_text.split(',')
for i in range(0, len(flowchart_input_array)):
draw.rect(screen, (0,0,0), (100,100+50*i, 40, 40), 0)
draw.rect(screen, (0,0,0), (100,100+50*i+40,
import subprocess
def main():
m3u8_url = "http://localhost:8080/master.m3u8"
stream_title = "mylive"
stream_privacy = "public"
rtmp_server = "rtmp://a.rtmp.youtube.com/live2"
rtmp_key = "12345-abcde-678910-fghij"
subprocess.call([
'ffmpeg',
'-i', m3u8_url,
'-acodec', 'copy',
'-vcodec', 'copy',
'-f', 'flv',
rtmp_server + '/' + rtmp_key
])
if __name__ == "__main__":
main()
def line():
print "=" * 80
from pytube import YouTube
from pytube.extract import apply_descrambler
from pytube.request import apply_signature
from pytube.exceptions import RegexMatchError
from pytube.helpers import safe_filename
from pytube.helpers import safe_get
from pytube.helpers import safe_print
from pytube.helpers import remove_quotes
from pytube.helpers import unescape
from pytube.helpers import video_extension
from pytube.helpers import video_id
from pytube.helpers import yt_url_validation
from pytube.helpers import Caption
from pytube.helpers import CaptionQuery
from pytube.helpers import Stream
from pytube.helpers import StreamQuery
from pytube.helpers import CaptionTrack
from pytube.contrib import annotations
from pytube.contrib import caption_converter
from pytube.contrib import app
from pytube.contrib import request
from pytube.contrib import extract
from pytube.contrib import download
from pytube.contrib import play
import subprocess as sp
rtmpUrl='rtmp://a.rtmp.youtube.com/live2/x/stream'
command = ['ffmpeg',
'-y',
'-i', 'http://localhost:8080/movie.m3u8',
'-c:v', 'libx264',
'-c:a', 'aac',
'-strict', 'experimental',
'-f', 'flv',
rtmpUrl]
sp.Popen(command)
def print_center(word):
print(word.center(30))
print_center('hongji')
import rtmpy
# connect to the stream
stream = rtmpy.Connection('rtmp://a.rtmp.youtube.com/live2/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
live=True)
# send the stream
stream.connect()
stream.create_stream()
stream.publish('live', 'live')
stream.write(open('/path/to/file.mp3', 'rb').read())
means the same thing as a = a + b
the loop will add a random element from b to a.
fix invalid codeSun, 11 Dec 2022
import string
print(string.ascii_letters + string.digits + string.punctuation)
text = "https://www.youtube.com/watch?v=Nq2wYlWFucg"
text[6:]
import subprocess
import os
import argparse
def stream(url, path):
subprocess.call(["streamlink", url, "best", "-o", path])
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CLI')
parser.add_argument('-i', '--input', help='Input URL', required=True)
parser.add_argument('-o', '--output', help='Output File', required=True)
args = vars(parser.parse_args())
stream(args['input'], args['output'])
import string
print(string.ascii_letters + string.digits + string.punctuation)
def live_stream(m3u8_url, youtube_url):
command = "ffmpeg -i '{}' -c copy -f flv '{}'".format(m3u8_url, youtube_url)
subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
import shutil
def print_in_center(text):
terminal_size = shutil.get_terminal_size((80, 20))
text_length = len(text)
text_width = int((terminal_size.columns - text_length) / 2)
print(' ' * text_width, text)
print_in_center('Hello, world!')
fix invalid codeSun, 11 Dec 2022 - 通常来说,在代码中,我们使用的是四个空格(或者是Tab符号),而不是空格,但是在Markdown中,空格+空格,那么就会有换行的效果,所以需要在Markdown正文中的代码中使用四个空格,而在代码中使用四个空格或者Tab符号。
- 在Markdown标记语言中,如果你想要在代码中使用四个空格
def convert(url):
stream = ffmpeg.input(URL)
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx', format='flv', vcodec="copy", acodec="aac", ac="2", ar="44100", ab="128k", strict="-2", flags="+global_header", bufsize="3000k")
ffmpeg.run(stream)
convert("Enter URL here")
def print_centered_line(text):
text = str(text)
text_len = len(text)
left_spaces = (80 - text_len) // 2
right_spaces = (80 - text_len) - left_spaces
print(' ' * left_spaces, text, ' ' * right_spaces, sep='')
print_centered_line('spam and eggs')
print_centered_line('spam, spam and eggs')
print_centered_line(12)
print_centered_line('spam, spam, spam and eggs')
def add_numbers(*args):
return sum(args)
>>> add_numbers(1, 2, 3, 4, 5)
15
import shutil
def print_centralized(string, symbol=1):
"""
print a string to the center of the terminal
:param string: The string to print
:param symbol: The width of the symbol (1 or 2)
:return: None
"""
term_size = shutil.get_terminal_size()
break_line = (term_size.columns - len(string)) // 2
if symbol == 1:
begin = "| " + "-" * break_line
end = "-" * break_line + " |"
else:
begin = "|-" + "-" * break_line
end = "-" * break_line + "-|"
print(begin + string + end)
def stream():
os.system("avconv -i https://raw.githubusercontent.com/codingforentrepreneurs/Guides/master/avconv_cheat_sheet/sample.m3u8 -c copy out.mp4")
def stream(a):
Streamlink.streams('a')['720p'].to_url()
from subprocess import run
def stream(url, name, privacy):
run(["ffmpeg", "-i", url, "-acodec", "copy", "-vcodec", "copy", "-f", "flv", "rtmp://a.rtmp.youtube.com/live2/" + name + "?privacyStatus=" + privacy])
from subprocess import Popen, PIPE, STDOUT
f = open('youtube_download.out', 'w')
out = Popen(["youtube-dl", "Nq2wYlWFucg"], stdout=PIPE, stderr=STDOUT)
for line in iter(out.stdout.readline, b''):
f.write(line.decode('utf-8'))
import os
os.system('clear')
def draw_line(size):
return '=' * size
from tkinter import *
def result(a,b):
print(a,b)
return int(a) * int(b)
def btnclick(i):
operation = i
if(operation == "add"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "sub"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "mul"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
elif(operation == "div"):
res = result(e1.get(),e2.get())
e3.delete(0,END)
e3.insert(0,res)
else:
def stream_to_youtube_in_python():
pass
def graph_function(function, x_range):
xs = list(x_range)
ys = []
for x in xs:
ys.append(function(x))
import matplotlib.pyplot as plt
plt.plot(xs,ys)
plt.show()
graph_function(lambda x: x**2, range(-100, 100))
from py_ts_segmenter import Segmenter
segmenter = Segmenter(
stream_input_url="http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4",
output_directory="output",
segment_duration=10,
name_prefix="movie",
output_extension=".ts",
verbose=True
)
def print_alphabet(string):
print(string.ascii_letters + string.digits + string.punctuation)
print_alphabet()
import requests
from bs4 import BeautifulSoup
import re
import os
#url = 'https://www.imdb.com/title/tt7286456/episodes'
url = 'https://www1.gogoanime.io/category/slam-dunk'
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
# print(soup.prettify())
episodes_links = []
episode_name = []
for link in soup.findAll('a'):
if '/watch' in link.get('href'):
episodes_links.append(link.get('href'))
episode_name.append(link.get('title'))
# print(episodes_links)
# print(episode_name)
# The episodes_links contains all the episodes links.
# Let's say we want to see the first episode.
for i in range(len(episodes_links)):
# html = requests.get("http://www1.gogoan
import subprocess
import os
import time
#ffmpeg -i 'https://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8' -c copy -bsf:a aac_adtstoasc -f flv rtmp://a.rtmp.youtube.com/live2/zpc3-3uq3-9x7p-g5q6
ffmpegCmd = 'ffmpeg -re -i "http://live.chosun.gscdn.com/live/tvchosun1.m3u8" -c copy -bsf:a aac_adtstoasc -f flv rtmp://a.rtmp.youtube.com/live2/zpc3-3uq3-9x7p-g5q6'
subprocess.run(ffmpegCmd, shell=True)
from py_ffmpeg_video_streaming import video_streaming_function
m3u8_url = 'https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8'
youtube_url = 'rtmp://a.rtmp.youtube.com/live2/hkhg-sdd3-q3qd-jgp8'
video_streaming_function(m3u8_url, youtube_url)
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
import os
import sys
if len(sys.argv) <= 2:
print('Usage: program.py <url> <youtubelive-streamkey>')
exit()
url = sys.argv[1]
stream_key = sys.argv[2]
# python program to stream video from m3u8 url to youtube using python-ffmpeg-video-streaming module
stream = FFMPEG_Streaming_Video_to_YoutubeLive(url, stream_key,
format=Formats.hls,
representation=Representation(Size(640, 360), Bitrate(400)),
representation=Representation(Size(854, 480), Bitrate(800)),
representation=Representation(Size(1280, 720), Bitrate(1600)))
stream.start()
# to stop the stream
stream.stop()
from moviepy.editor import *
video = VideoFileClip("http://skynews-vh.akamaihd.net/i/skynews_1@319270/master.m3u8")
video.write_videofile("skynews.mp4")
import rtmpy
import time
def stream(stream_name, url):
"""
Function to stream m3u8 url to rtmp
"""
r = rtmpy.RTMP(
rtmp='rtmp://a.rtmp.youtube.com/live2',
app='app',
playpath=stream_name,
swfUrl='http://www.twitch.tv/widgets/live_embed_player.swf?hostname=www.twitch.tv&channel=' + stream_name + '&auto_play=true&start_volume=25',
tcUrl='rtmp://a.rtmp.youtube.com/live2',
pageUrl='http://www.twitch.tv/' + stream_name,
live=True
)
r.connect()
r.publish(stream_name)
r.stream('http://www.twitch.tv/' + stream_name + '/hls/' + url + '.m3u8')
time.sleep(5)
stream('
# the above code is the output of a program now remove:
[youtube] Nq2wYlWFucg: Downloading webpage
[youtube] Downloading just video Nq2wYlWFucg because of --no-playlist
[youtube] Nq2wYlWFucg: Downloading m3u8 information
[youtube] Nq2wYlWFucg: Downloading MPD manifest
from pyffmpeg import FFmpeg
def stream_to_rtmp(m3u8_url, rtmp_url):
ff = FFmpeg(
inputs={m3u8_url: None},
outputs={rtmp_url: '-c copy -f flv'}
)
ff.cmd
ff.run()
This for loop adds b to a c times.
fix invalid codeSun, 11 Dec 2022 ![calculator_output](https://user-images.githubusercontent.com/54179065/65385323-e6c8b800-ddb2-11e9-8f36-9af7a3c3e3f7.png)
## MACHINE LEARNING
**1) K-NEAREST NEIGHBOURS(KNN)**
* KNN is a supervised machine learning algorithm which can be used for both classification as well as regression problems. But it is mostly used in classification problems.
* KNN is a non-parametric and lazy learning algorithm. Non-parametric means there is no assumption for underlying data distribution. In other words, the model structure determined from the dataset. This will be very helpful in practice where most of the real world datasets do not follow mathematical theoretical assumptions. Lazy algorithm means it does not need any training data points for model generation. All training data used in the testing phase. This makes training faster and testing phase slower and costlier. Costly testing phase means time and memory.
* KNN algorithm uses ‘feature similarity’ to predict the
import pandas as pd
student = pd.DataFrame({
'ID': [1, 2, 3, 4, 5, 6],
'Name': ['A', 'B', 'C', 'D', 'E', 'F'],
'Gender': ['M', 'F', 'M', 'M', 'F', 'M'],
'Age': [18, 19, 20, 21, 22, 23],
'Grade': [2, 1, 4, 3, 2, 4]
})
# ALTER table to add new attributes / modify data type / drop attribute
student['City'] = ['B', 'A', 'C', 'A', 'B', 'C']
student['Grade'] = student['Grade'].astype('category')
student.drop(['City'], axis=1)
# UPDATE table to modify data
student['Name'] = ['AA', 'BB', 'CC', 'DD', 'EE', 'FF']
student['Gender'] = 'M'
student.loc[student['ID'] = 1, 'Grade'] = 2
class Student:
def __init__(self, name, age, grade):
self.name = name
self.age = age
self.grade = grade # 0 - 100
def get_grade(self):
return self.grade
class Course:
def __init__(self):
self.students = []
def add_student(self, student):
if len(self.students) < 30:
self.students.append(student)
def get_average_grade(self):
value = 0
for student in self.students:
value = value + student.get_grade()
return value / len(self.students)
s1 = Student("Tim", 19, 95)
s2 = Student("Bill", 19, 75)
s3 = Student("Jill", 19, 65)
course = Course()
course.add_student(s1)
course.add_student(s2)
course.add_student(s3)
course.
import sqlite3
connection = sqlite3.connect("database.db")
DB_cursor = connection.cursor()
DB_cursor.execute("CREATE TABLE students (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, age INTEGER NOT NULL, sex TEXT NOT NULL)" )
DB_cursor.execute("INSERT INTO students (name, age, sex) VALUES ('John', '21', 'M')")
DB_cursor.execute("INSERT INTO students (name, age, sex) VALUES ('Mary', '25', 'F')")
connection.commit()
DB_cursor.execute("SELECT * FROM students")
DB_cursor.execute("SELECT name, sex FROM students")
DB_cursor.execute("ALTER TABLE students ADD COLUMN school TEXT")
DB_cursor.execute("ALTER TABLE students MODIFY age INTEGER")
DB_cursor.execute("ALTER TABLE students DROP COLUMN school")
DB_cursor.execute("UPDATE
CREATE PROCEDURE sp_insert_student
(
IN p_id int,
IN p_name varchar(100),
IN p_gender varchar(1),
)
BEGIN
INSERT INTO tbl_student(id, name, gender)
VALUES (p_id, p_name, p_gender);
END//
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLSStream, HLSStreamWriter, HLSStreamWriterError
src = 'http://hls.nokia.com/nokia/Nokia_LTE_Dummy_H264_2M/Nokia_LTE_Dummy_H264_2M.m3u8'
hls = HLSStreamWriter(src, stream_name='stream')
hls.streams.append(HLSStream('out.m3u8',
bitrate=Bitrate(1280, 'k'),
representation=Representation(size=Size(1280, 720)),
format=Formats.h264()))
hls.start()
from moviepy.editor import VideoFileClip
# Replace with the actual M3U8 URL
url = "https://xxxxxxx.m3u8"
# Open the video file
clip = VideoFileClip(url)
# Set the codecs and other options
clip = clip.set_fps(30).set_audio_fps(44100).set_audio_bitrate(128)
# Write the video to file
clip.write_videofile("output.mp4", codec='libx264', audio_codec='aac')
import pym3u8
import http.client
import urllib.request
import urllib.parse
import urllib.error
import base64
import subprocess
import threading
import time
import json
# You must provide an m3u8 playlist url. The program will download the m3u8
# and segment files as they are listed in the m3u8 file. The files will be
# downloaded to the current directory.
# The program will merge the files into a single file called
# merged_stream.ts
# The program will then stream the file to the address you set below.
# The program will then loop and check the playlist every 60 seconds.
# If changes are detected the program will download and stream any new
# segments to the rtmp server.
#
# m3u8 file: https://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8
from moviepy.editor import VideoFileClip
# Replace with the actual M3U8 URL
url = "https://xxxxxxx.m3u8"
# Open the video file
clip = VideoFileClip(url)
# Set the codecs and other options
clip = clip.set_fps(30).set_audio_fps(44100).set_audio_bitrate(128)
# Write the video to file
clip.write_videofile("output.flv", codec='libx264', audio_codec='aac')
def stream_m3u8_to_rtmp(url, rtmp_url):
source = FFmpeg(inputs={'input': url},
outputs={'output': rtmp_url})
source.run()
stream_m3u8_to_rtmp("https://xxxxxx/master.m3u8", 'rtmp://xxxx.cloudfront.net/cfx/st/xxx')
import subprocess
import sys
import os
import rtmpy
#youtube key
rtmpy.publish(
"rtmp://a.rtmp.youtube.com/live2/[youtube key]",
source = "http://[your ip]:[port]/[your m3u8]",
live = True,
start = True
)
fix invalid codeSun, 11 Dec 2022 # Exercise 3
> Practice Exe03.py and Exe04.py.
# Exercise 4
> Practice Exe03.py and Exe04.py.
def get_sum(lst):
return sum(lst)
get_sum([1,2,3,4,5,6])
from math import *
def fill_line():
term_width = int(os.popen('stty size', 'r').read().split()[1])
print('=' * term_width)
import re
re.sub(r'\[youtube\].*?\n', '', output)
def stream_file(self, url, duration=0):
"""
Stream file from url to rtmp server
:param url: url of file
:param duration: duration in seconds of file to stream, 0 for entire file
"""
if 'm3u8' in url:
self.stream_m3u8(url, duration)
else:
raise NotImplementedError('Only m3u8 streaming is supported at this time')
import mysql.connector
conn = mysql.connector.connect(
host = 'localhost',
user = 'root',
password = ''
database = 'student'
)
cursor = conn.cursor()
cursor.execute("CREATE DATABASE student")
conn.close()
import ffmpeg
stream = ffmpeg.input('http://playertest.longtailvideo.com/adaptive/bipbop/gear4/prog_index.m3u8')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
import requests
import re
import time
import subprocess
def get_m3u8(url):
#url = "http://douyu.com/lapi/live/getPlay/96419"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36'}
response = requests.get(url, headers=headers)
data = response.text
strinfo = re.compile('hls_url="(.*?)"')
result = strinfo.findall(data)
#print(result[0])
return result[0]
def stream(url):
print('start')
p = subprocess.Popen(['ffmpeg', '-i', url, '-c:v', 'libx264', '-c:a', 'aac', '-strict', '-2', '-f', '
from string import *
using System;
using System.Collections.Generic;
using System.Text;
using System.Net;
using System.IO;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
string url = "http://hls.cnr.cn/live/hls/cctv4_2/index.m3u8";
var request = (HttpWebRequest)WebRequest.Create(url);
request.Timeout = 10000;
request.Method = "GET";
request.KeepAlive = false;
using (var response = (HttpWebResponse)request.GetResponse())
{
using (var stream = response.GetResponseStream())
using (var reader = new System.IO.StreamReader(stream))
{
var result = reader.ReadToEnd();
var lines = result.Split('\n');
foreach (var i in lines)
{
if (i.StartsWith("http"))
{
var f =
import subprocess
import os
def runstream(m3u_url):
# runs FFMPEG to stream the live m3u8 to youtube
# requires ffmpeg installed and environment variable set
subprocess.Popen("ffmpeg -re -i " + m3u_url + " -c copy -f flv rtmp://a.rtmp.youtube.com/live2/" + os.getenv("YOUTUBE_STREAM_KEY"))
pass
runstream("https://mylivestream.com/hls/mystream.m3u8")
for line in iter(p.stdout.readline, b''):
print line,
sys.stdout.flush()
import rtmpy
# create the client
client = rtmpy.Client()
client.connect('rtmp://localhost/stream')
# publish the stream
client.publish('rtmp://streaming.server.com/stream.m3u8')
# disconnect
client.disconnect()
# the default connection parameters are
{
'app': 'stream',
'port': 1935,
'flash_ver': 'FMLE/3.0 (compatible; FMSc/1.0)',
'swf_url': 'http://streaming.server.com/example/stream.swf',
'tc_url': 'rtmp://streaming.server.com',
'page_url': 'http://streaming.server.com',
'video_size': (640, 480)
}
import videostream
import ffmpeg
# the code is from https://github.com/zachbruggeman/videostream
# this is a demo to stream webcam to rtmp server by using videostream
# coding=utf-8
# 以下是在docker環境下使用videostream這個package,將webcam的畫面轉成rtmp檔案
# 轉檔案的過程,基本上只要有ffmpeg的指令就可以做到,但很麻煩,因此使用videostream這個package簡化
# 這個package使用python執行,可以直接在docker中執行,不
def line():
print('=' * 70)
line()
def print_centralized(word):
word_length = len(word) + 4
print("*" * word_length)
print("*" + word + "*")
print("*" * word_length)
print_centralized("python")
import streamz
from streamz import Stream
import ffmpeg
import time
import os
# m3u8 url
url = 'http://hls-dvr-prod-us-west-1.hls.ttvnw.net/v1/playlist/hU2mEJ6YwZ6_0fHx4HBduA.m3u8?signature=4ae3e3a5d5a5b5de5c5f0d1aaf7fefbd&sig=c9a2b0fceb2f65ccae727f1c80d6b2c2&expire=1543979200&allow_source=true'
# rtmp url
rtmpUrl = 'rtmp://live-dfw.twitch.tv/app/live_230751265_RgYvN8rjWq3EqCe3qyFbvXKWgZuJzc'
# ffmpeg command
cmd = ffmpeg.input(url) \
.output(
def line_width():
cols, rows = shutil.get_terminal_size((80, 40))
return cols
def line():
print('=' * line_width())
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
video = ffmpeg.input("https://video-dev.github.io/streams/x36xhzz/x36xhzz.m3u8")
video = ffmpeg.output(video, "out.mp4")
video = ffmpeg.overwrite_output(video)
video = ffmpeg.run(video)
import subprocess
def stream_m3u8_to_youtube(m3u8_url, stream_key, title, description, quality, camera_name):
yt_url = "rtmp://a.rtmp.youtube.com/live2"
cmd = f'''
ffmpeg -v error -i {m3u8_url} -acodec copy -vcodec copy \
-f flv {yt_url}/{stream_key} -bsf:a aac_adtstoasc -c copy \
-metadata copy:title="{title}" \
-metadata copy:description="{description}" \
-metadata copy:quality="{quality}" \
-metadata copy:camera_name="{camera_name}"'''
subprocess.call(cmd, shell=True)
# libraries
from ffmpeg_streaming import Formats
from ffmpeg_streaming import Bitrate
from ffmpeg_streaming import Formats
from ffmpeg_streaming import Representation
from ffmpeg_streaming import Representation
from ffmpeg_streaming import Streaming
from ffmpeg_streaming import Streaming
# url
url = "https://storage.googleapis.com/shaka-demo-assets/angel-one/dash.mpd"
# video stream from url
stream1 = Streaming(url, timeout=10).streams[0]
# video creation
stream2 = stream1.stream(Representation(128000, 'mp4a.40.2', 'en', 'audio/mp4', 'aac', Bitrate(128000, 128000)))
# upload to youtube
stream2.output('output.mp4', format=Formats.mp4, audio_bitrate=128000)
import ffmpeg
import subprocess
# to stream from file or url
video_src = 'https://file.m3u8'
# to stream to youtube
video_dst = 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx'
command = ['ffmpeg',
'-y',
'-i', video_src,
'-vcodec', 'h264',
'-acodec', 'aac',
'-f', 'flv',
video_dst]
subprocess.Popen(command)
import os
import signal
import subprocess
from streamz import Stream
from streamz.dataframe import DataFrame
from streamz.dataframe import DataFrame
def run_command(command):
return subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
)
def kill_process_tree(pid, sig=signal.SIGTERM):
"""Kill an entire process tree (including grandchildren) with signal
"sig" and return a (gone, still_alive) tuple.
"gone" is a list of os.waitpid() style results for all the children
that were killed.
"still_alive" is a list of pid's that were alive before the signal was
sent that were still alive after it was sent.
"""
assert pid != os.getpid(), "error: cannot kill self"
parent = psutil.Process(pid)
children
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
def main():
input_stream = "http://hls.iptv.ge/stream/content/b054634c-8cbe-47ce-98f0-eba51e7c9f82/playlist.m3u8"
output_file_path = "out.mp4"
output_stream = "rtmp://a.rtmp.youtube.com/live2/[youtube_code]"
formats = Formats(
format_id='mp4',
format_note='mp4',
format='mp4',
extensions='mp4',
ffmpeg_codec='libx264',
ffmpeg_full_codec='libx264',
audio_codec='aac',
)
bitrate = Bitrate(
bitrate=65536,
width=1280,
height=720,
video_codec='h264',
fps=30,
)
representation =
def get_google():
return requests.get('https://www.google.com')
def remove_youtube_lines(lines):
for l in lines:
if l.startswith('[youtube]'):
continue
yield l
for l in remove_youtube_lines(lines):
print(l)
# OR
for l in lines:
if l.startswith('[youtube]'):
continue
print(l)
++
#include <iostream>
#include <string>
#include <regex>
#include <fstream>
#include <streambuf>
#include <memory>
#include <algorithm>
#include <functional>
#include <map>
#include <unistd.h>
#include <openssl/md5.h>
#include <curl/curl.h>
using namespace std;
#define CFG_PATH "./cfg"
#define CFG_FILE "hls.cfg"
#define LOG_PATH "./log"
#define LOG_FILE "hls.log"
void log_info(string content);
void log_error(string content);
void log_info(string content) {
time_t now;
time(&now);
string time_str = ctime(&now);
time_str.replace(time_str.find("\n"), 1, "");
time_str += " ";
def stream(url):
# Create an RTMPClient instance
client = pyrtmp.RTMPClient(sys.argv[2], sys.argv[3], sys.argv[4])
# Connect to the RTMP server
if not client.connect():
print('Could not connect to the RTMP server')
return
# Perform a stream publish
if not client.publish(sys.argv[5]):
print('Could not publish to the RTMP server')
return
# Stream the specified file
if not client.stream(url):
print('Could not stream the file')
return
stream('http://my.m3u8.url')
#!/usr/bin/python
from pyffmpeg import FFMpeg
import sys
infile = sys.argv[1]
outfile = sys.argv[2]
print infile
print outfile
ff = FFMpeg()
ff.input(infile).output(outfile, format='flv').run()
import sys
import subprocess as sp
import pym3u8
def get_segments(m3u8_url):
playlist = pym3u8.load(m3u8_url)
segments = playlist.segments
return segments
def streaming(m3u8_url, rtmp_url):
segments = get_segments(m3u8_url)
cmd = ['ffmpeg']
for segment in segments:
cmd.extend(['-i', segment.uri])
cmd.extend(['-c', 'copy'])
cmd.extend(['-f', 'flv'])
cmd.append(rtmp_url)
sp.Popen(cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL).wait()
if __name__ == '__main__':
m3u8_url = sys.argv[1]
rtmp_url = sys.argv[2]
streaming(m3u8_url, rtmp
import os
import sys
def StartRTMPStream(channel_name, channel_url):
os.system(
r"rtmpdump -r '{0}' -o {1}.mp4".format(channel_url, channel_name)
)
def StartRTMPStreamWithProxy(channel_name, channel_url, proxy_url):
os.system(
r"rtmpdump -r '{0}' -o {1}.mp4 --swfVfy 'http://www.ustream.tv/flash/viewer.swf' --swfUrl 'http://www.ustream.tv/flash/viewer.swf' --conn S:'http://www.ustream.tv/flash/viewer.connection.token' --conn S:'{2}'".format(channel_url, channel_name, proxy_url)
)
import random
print(random.randint(1,6))
#!/usr/bin/python
import requests
import subprocess
import time
url = "https://video.afreecatv.com/playlist/vod/playlist_cdn.m3u8?key=%s&p=%s&user=%s&code=%s"
key='5a5b5c5d5e5f'
p='1'
user='1234'
code='1234'
url = (url % (key,p,user,code))
res = requests.get(url, stream=True)
with open("list.m3u8", "wb") as f:
for chunk in res.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
for line in open("list.m3u8", "rb").readlines():
if line[0:4] == b"http":
url = line.strip()
print(url)
subprocess.call("ffmpeg -i %s -c copy -
import random
def dice():
output = random.randint(1,6)
return output
dice()
import youtube_upload
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation
import subprocess
import time
while True:
p = subprocess.Popen(["ffmpeg", "-re", "-i", "http://example.com/my.m3u8", "-acodec", "copy", "-vcodec", "copy", "-f", "flv", "rtmp://example.com/live/mystream"])
print("live stream started")
p.wait()
print("live stream stopped")
time.sleep(2)
import shutil
def print_line():
width, _ = shutil.get_terminal_size()
print('='*width)
print_line()
def remove_https(input_string):
return input_string.replace("https", "")
remove_https("https://www.youtube.com/watch?v=Nq2wYlWFucg")
def mysql_backup():
backup_sql()
restore_sql()
def hotel():
# this is a hotel managment project using mysql
hotel()
CREATE TABLE students(id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(30) NOT NULL,
class VARCHAR(30) NOT NULL,
age INT(30) NOT NULL);
fix invalid codeSun, 11 Dec 2022
import random
list_of_passwords = ['apple', 'banana', 'peach', 'grape', 'orange']
password = random.choice(list_of_passwords)
print(password)
from m3u8 import get_playlist, get_video_url, start_stream
from os import path
def stream(url):
m3u8 = get_playlist(url)
video_url = get_video_url(m3u8)
start_stream(video_url)
import shutil
def line(number):
column = shutil.get_terminal_size().columns
return '='*column
line(20)
def m3u8_to_rtmp_flv(m3u8_url, rtmp_url):
ts_urls = [urljoin(m3u8_url, url)
for url in re.findall(r'(/.*?\.ts)', requests.get(m3u8_url).text)]
ts_urls = ts_urls
if ts_urls:
ts_url = random.choice(ts_urls)
response = requests.get(ts_url, stream=True)
if response.status_code == 200:
print("Streaming:", ts_url)
return ffmpeg_stream(response, rtmp_url)
def ffmpeg_stream(response, rtmp_url):
ffmpeg_cmd = ['ffmpeg', '-y', '-i', '-', '-c', 'copy', '-f', 'flv', rtmp_url]
proc = Popen(ffmpeg_cmd, stdin=PIPE, stderr=P
import sqlite3
conn = sqlite3.connect('student.db')
print("Opened database successfully")
conn.execute('''CREATE TABLE STUDENT
(ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
MARKS INT);''')
print("Table created successfully")
conn.execute("INSERT INTO STUDENT (ID,NAME,AGE,ADDRESS,MARKS) \
VALUES (1, 'Paul', 32, 'California', 200)");
conn.execute("INSERT INTO STUDENT (ID,NAME,AGE,ADDRESS,MARKS) \
VALUES (2, 'Allen', 25, 'Texas', 500)");
conn.execute("INSERT INTO STUDENT (ID,NAME,AGE,ADDRESS,MARKS) \
VALUES (3, 'Teddy', 23, 'Norway', 200)");
conn.execute("INSERT INTO STUDENT (ID,NAME
import ffmpeg
from livestreamer import Livestreamer, StreamError
from livestreamer.stream import RTMPStream
from livestreamer.plugins import Plugin, PluginError, NoStreamsError
from livestreamer.stream import RTMPStream
from livestreamer.compat import urlparse, urljoin, bytes, str
from livestreamer.plugin import Plugin
from livestreamer.plugin.api import http
from livestreamer.exceptions import (NoPluginError, NoStreamsError,
PluginError, StreamError)
from livestreamer.stream import StreamProcess
from livestreamer.stream.flvconcat import FLVTagDemuxer, FLVTagMuxer, FLVTagParseError
class YoutubeLive(Plugin):
@classmethod
def can_handle_url(self, url):
return "youtube" in url
def _get_streams(self):
self.logger.debug("Fetching stream info")
# This URL is used for fetching the RTMP URL
res = http.get(self.url)
url = urljoin(
string = "https://www.youtube.com/watch?v=Nq2wYlWFucg"
string[7:]
def m3u8_to_rtmp(m3u8_url, rtmp_url):
# m3u8_url = "https://video-dev.github.io/streams/x36xhzz/x36xhzz.m3u8"
# rtmp_url = "rtmp://localhost:1935/live/stream"
key_uri = m3u8_url.split("/")[-1]
m3u8_dir_path = "./" + key_uri
ts_file_list = []
ts_file_name = []
ts_path_list = []
ts_path_name = []
print("key_uri:%s" % key_uri)
if not os.path.exists(m3u8_dir_path):
os.mkdir(m3u8_dir_path)
# print(m3u8_url)
r = requests.get(m3u8_url)
with open(key_uri, "
from pyffmpeg import FFmpeg
def stream(url):
ff = FFmpeg(
inputs={url: None},
outputs={'rtmp://localhost/app/test': ['-c:v', 'libx264', '-c:a', 'aac', '-f', 'flv']})
ff.run()
stream('http://localhost:8000/test.m3u8')
def drawflowchart(n):
for i in range(n):
print("|-------------|")
print("| |")
print("| |")
print("| start |")
print("| |")
print("| |")
print("| |")
print("|-------------|")
print("| |")
print("| |")
print("| stop |")
print("| |")
print("| |")
print("|-------------|")
drawflowchart(5)
import subprocess
import time
try:
while True:
process = subprocess.Popen("ffmpeg -i 'http://xxx/xxx.m3u8' -c copy -f flv 'rtmp://xxx/xxx/xxx'", shell=True, stdout=subprocess.PIPE)
time.sleep(5)
except KeyboardInterrupt:
print("\nCtrl-C pressed. Shutting down.")
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLS, DASH
stream = HLS(
Formats.h264(),
Size(width=1280, height=720),
Bitrate(128, 128),
Representation(720, 1080)
)
stream.add_input("https://bitdash-a.akamaihd.net/content/sintel/hls/playlist.m3u8").output("../output.m3u8").run()
def create(name, roll_no):
file = open("student.txt", 'w')
file.write("Name: " + name + "\n")
file.write("Roll number: " + roll_no)
file.close()
def search(roll_no):
file = open("student.txt", 'r')
for line in file:
if line.startswith("Roll number:"):
if line[13:] == roll_no:
print("Found")
return
print("Not Found")
create("Rohan", "17BCE0522")
search("17BCE0522")
import os
def full_line(char = "="):
os.system("tput setab 7")
print(char * shutil.get_terminal_size().columns)
os.system("tput setab 0")
full_line()
import pyffmpeg
input_url = "http://example.com/test.m3u8"
output_url = "rtmp://127.0.0.1/live/test"
stream = pyffmpeg.VideoStream()
stream.open(input_url)
out = pyffmpeg.VideoOutput(output_url, stream.streams[0].width, stream.streams[0].height, "flv", "libx264")
while True:
frame = stream.get_frame()
if not frame:
break
out.write_frame(frame)
out.close()
stream.close()
def my_string():
return ascii_letters + digits + punctuation
my_string()
def stream_m3u8_rtmp(m3u8_url, rtmp_url):
response = requests.get(m3u8_url)
m3u8_content = response.content
m3u8_lines = m3u8_content.split("\n")
m3u8_ts_lines = [e for e in m3u8_lines if e.startswith("http")]
for ts_url in m3u8_ts_lines:
response = requests.get(ts_url)
ts_content = response.content
ts_file_name = path.basename(ts_url)
print ts_file_name
with open(ts_file_name, "wb") as f:
f.write(ts_content)
stream(ts_file_name, rtmp_url)
os.remove(ts_file_name)
import shutil
def printline():
cols, rows = shutil.get_terminal_size()
print("=" * cols)
The code does:
a = "https://www.youtube.com/watch?v=Nq2wYlWFucg"
print(a[7:])
import shutil as su
def fill_line():
return su.get_terminal_size().columns*'='
fill_line()
newest = max(ffmpeg.input(URL), key = os.path.getctime)
def printLine():
print('=' * 80)
import os
import time
def stream():
os.system("ffmpeg -i http://192.168.43.1:8080/hls/test.m3u8 -vcodec copy -acodec copy -absf aac_adtstoasc -f flv rtmp://a.rtmp.youtube.com/live2/x/")
stream()
def print_line(screen_width):
print('=' * screen_width)
print_line(40)
from ffmpy3 import FFmpeg
stream = FFmpeg(
inputs={'http://10.0.0.1:8081/hls/stream.m3u8': None},
outputs={'rtmp://a.rtmp.youtube.com/live2/x/y': '-c:v libx264 -crf 23 -c:a aac -ar 44100 -ac 1 -b:a 128k -bufsize 512k -f flv'}
)
stream.run()
def add(a, b):
return a + b
add(1, 2)
from flvstreamer.streamer import FLVStreamer
streamer = FLVStreamer()
streamer.setup(
url="http://wowzaec2demo.streamlock.net/vod-multitrack/_definst_/smil:ElephantsDream/ElephantsDream.smil/playlist.m3u8",
is_live=False,
rtmp="rtmp://a.rtmp.youtube.com/live2",
app="vod-multitrack",
playpath="mp4:ElephantsDream/ElephantsDream.mp4"
)
streamer.start()
import moviepy.editor as mp
def stream_video(m3u8_url):
clip = mp.VideoFileClip(m3u8_url)
clip.preview()
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLS, DASH, Format
hls_stream = HLS(
[
Representation(Bitrate('1000k'), Width(1920), Height(1080)),
Representation(Bitrate('500k'), Width(1280), Height(720)),
Representation(Bitrate('400k'), Width(1024), Height(576)),
Representation(Bitrate('300k'), Width(640), Height(480))
],
[
Format(Formats.mpegts, Representation(Bitrate('0k'), Width(640), Height(480))),
Format(Formats.mp4, Representation(Bitrate('0k'), Width(640), Height(480))),
],
base_url='http://localhost:8000/hls/'
)
dash_stream = DASH(
[
Representation(Bitrate('1000k'), Width(1920), Height(1080)),
Representation(Bitrate('500k'), Width(
This program creates a random password of a certain length and consists of letters, numbers, and symbols.
import sys
import requests
import ffmpeg
import http
import json
import subprocess
import streamlink
import os
import time
import threading
from urllib.parse import urlparse
import datetime
import dateutil.parser
def m3u8(url, rtmp):
# get m3u8 url from url
streams = streamlink.streams(url)
stream = streams["best"].url
print("stream " + stream)
start(stream, rtmp)
def start(url, rtmp):
# set up
url = urlparse(url)
url = url._replace(netloc='127.0.0.1:8080')
url = url.geturl()
rtmp = urlparse(rtmp)
rtmp = rtmp._replace(netloc='127.0.0.1:1935')
rtmp = rtmp.geturl()
t = threading.Thread(target=thread, args=(url, rtmp))
t.start()
def thread(
import shutil
print("=" * shutil.get_terminal_size().columns)
import requests
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLS
def main():
url = "http://www.streambox.fr/playlists/test_001/stream.m3u8"
response = requests.get(url, stream=True)
print(response)
#print(response.text)
if response.status_code == 200:
representation = Representation(Bitrate(128 * 1024), Size(640, 480))
hls_stream = HLS(representation)
hls_stream.parse_m3u8(response.text)
path_to_save = "/home/joshua/Desktop/py-ffmpeg/media/index.m3u8"
hls_stream.save(path_to_save)
else:
print("error")
if __name__ == "__main__":
main()
translateSun, 25 Dec 2022
from string import *
fix invalid codeThu, 29 Dec 2022 ## 10.3 pygal_maps_world.py
print("hello world".replace("hello", ""))
def school():
#create a school database
#add student name, class, roll number, phone number etc
#retrieve data
school()
def read_file_and_display_stat(file_name):
with open(file_name, "r") as input_file:
content = input_file.read()
num_vow = 0
num_cons = 0
num_upper = 0
num_lower = 0
for char in content:
if char in "aeiouAEIOU":
num_vow += 1
elif char.isalpha():
num_cons += 1
elif char.islower():
num_lower += 1
elif char.isupper():
num_upper += 1
print("Number of vowels: " + str(num_vow))
print("Number of consonants: " + str(num_cons))
print("Number of uppercase letters: " + str(num_upper))
print("Number of lowercase letters: " + str(num_lower))
read_file_and_display_stat("functions_and_files.txt")
def print_line(n):
line_width = 80
print("\n" + "=" * line_width)
print_line(1)
def student_data_entry():
# add the codes here
student_data_entry()
def clip_stream(url, name, start = 0, end = 0):
clip = VideoFileClip(url)
if start != 0:
clip = clip.subclip(start, end)
clip.write_videofile(name)
def make_string():
import string
print(string.ascii_letters)
print(string.ascii_lowercase)
print(string.ascii_uppercase)
print(string.digits)
print(string.hexdigits)
print(string.whitespace)
print(string.punctuation)
make_string()
def m3u8_to_rtmp(url, rtmp, timeout=10):
r = requests.get(url, stream=True)
chunks = []
ts_urls = []
for chunk in r.iter_content(chunk_size=1024):
if chunk:
chunks.append(chunk)
else:
m3u8_str = b''.join(chunks)
ts_urls = re.findall(r'(?<=#EXTINF:)[^\n]+\n(http[^\n]+)', m3u8_str.decode('utf-8'))
break
ts_urls = [(url, response_headers(url)) for url in ts_urls]
if not ts_urls:
raise Exception("Could not get ts urls")
ts_urls = sorted(ts_urls, key=lambda x: x[0])
rtmp_url = rtmp + '/' + url.split('/')[-1].split('
def update_marks(rollno):
with open('data.txt', 'r') as f:
data = f.read()
data = data.split('\n')
for i in range(len(data)):
data[i] = data[i].split()
if data[i][0] == rollno:
data[i][2] = input('Enter new marks: ')
with open('data.txt', 'w') as f:
for i in data:
f.write(' '.join(i) + '\n')
update_marks('101')
def create_hotel():
# create a 'hotel' database
# create a 'rooms' table, with 'room_number', 'room_type', 'price' columns
# create a 'guests' table, with 'guest_name', 'room_number'
# create a 'bookings' table, with 'guest_name', 'room_number', 'check_in_date', 'check_out_date' columns
# insert at least 3 guests into the 'guests' table
# insert at least 3 bookings into the 'bookings' table
pass
import os
import subprocess
def stream(m3u8_url, rtmp_url):
cmd = "rtmpdump --rtmp '{rtmp}' --playpath '{m3u8}' -o '{m3u8}.flv' -v".format(rtmp = rtmp_url, m3u8 = m3u8_url)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
print out
p.kill()
def add(a, b):
return a + b
add(1, 2)
def line(a='-'):
return a*80
line()
import requests
from bs4 import BeautifulSoup
def scrape():
url = "https://www.programming-helper.com/generate-function"
r = requests.get(url)
soup = BeautifulSoup(r.text, 'html.parser')
results = soup.find_all('div', attrs={'class':'col-sm-12'})
for result in results:
print(result.text)
scrape()
import sqlite3
conn = sqlite3.connect('student_records.db')
c = conn.cursor()
# Create table
c.execute('''CREATE TABLE IF NOT EXISTS student
(id INTEGER PRIMARY KEY, name text, gender text, age INTEGER, grade REAL, dept text)''')
# Insert a row of data
c.execute("INSERT INTO student VALUES (1, 'Mary', 'female', 20, 'A', 'CSE')")
c.execute("INSERT INTO student VALUES (2, 'John', 'male', 21, 'B', 'CSE')")
c.execute("INSERT INTO student VALUES (3, 'Jane', 'female', 20, 'A+', 'CSE')")
c.execute("INSERT INTO student VALUES (4, 'Mike', 'male', 20, 'A+', 'CSE')")
c.execute("INSERT INTO student VALUES (5, 'Tom', 'male', 21, 'B+', 'CSE')")
c.execute("INS
import subprocess
command = 'ffmpeg -i "https://bitdash-a.akamaihd.net/content/MI201109210084_1/m3u8s/f08e80da-bf1d-4e3d-8899-f0f6155f6efa.m3u8" -c:v copy -c:a aac -ar 44100 -ab 128k -ac 2 -strict -2 -f flv -listen 1 rtmp://127.0.0.1:1935/live/stream'
subprocess.call(command, shell=True)
def stream(url):
url = url.replace('https', 'http')
stream = FLVStream(url)
stream.dump()
def full_line():
print("="*shutil.get_terminal_size().columns)
translateTue, 03 Jan 2023 def add(a, b):
return a + b
def add(a, b):
return a + b
add(1, 2)
def stream_m3u8(url_m3u8, Stream_to_youtube=True,
youtube_stream_key=None,
polling_interval=60,
quality_of_stream=None,
video_name=None,
video_description=None,
category_id=None,
tags=None):
# do stuff ...
pass
import m3u8
import ffmpy
import requests
import youtube_dl
def stream2youtube(stream_url, youtube_url):
m3u8_obj = m3u8.load(stream_url)
ts_list = []
for x in m3u8_obj.segments:
ts_list.append(x.uri)
if ts_list:
for t in ts_list:
if not t.startswith('http'):
print(t)
url = stream_url.replace(stream_url.split('/')[-1], t)
r = requests.get(url, allow_redirects=True)
with open(t, 'wb') as f:
f.write(r.content)
else:
continue
try:
stream_url.split('/')[-1]
ff = ffmpy.FFmpeg(
inputs={stream_url.split('/')[-1]: None},
outputs={'output.mp
def change_str(str):
return str[-1] + str[1:-1] + str[0]
change_str('abcd')
fix invalid codeSun, 11 Dec 2022
import random
def get_random_int(start, end, count):
random_list = []
for i in range(count):
random_list.append(random.randint(start,end))
return random_list
def stream_m3u8(url, rtmp_url):
t = threading.Thread(target=stream_m3u8_thread, args=(url, rtmp_url))
t.start()
return str(t.ident)
def stream_m3u8_thread(url, rtmp_url):
ts_segments = m3u8_to_ts(url)
ts_segments = ts_segments + " \n"
log_info("stream start")
for chunk in iter(lambda: ts_segments, ''):
rtmp_stream.write(chunk)
rtmp_stream.close()
log_info("stream end")
#os._exit(0)
import os
from os.path import isfile
def create_file(roll_no, name, marks):
#Check if file exists
if isfile("students.txt"):
#File Exists
with open("students.txt", "a") as students_file:
students_file.write("\n" + str(roll_no) + "\t" + name + "\t" + str(marks))
else:
#Create a new file
with open("students.txt", "w") as students_file:
students_file.write("Roll Number\tName\tMarks")
students_file.write("\n" + str(roll_no) + "\t" + name + "\t" + str(marks))
def update_marks(roll_no, new_marks):
with open("students.txt", "r") as students_file:
lines = students_file.readlines()
with open("students.txt", "w") as students_file:
for line in lines
from subprocess import Popen
import sys
import os
def stream():
m3u8 = sys.argv[1]
stream_key = sys.argv[2]
name = sys.argv[3]
rtmp_url = 'rtmp://localhost:1935/rtmp/'
rtmp_url = rtmp_url + stream_key
p = Popen('ffmpeg -i {} -c copy -f flv {}'.format(m3u8, rtmp_url), shell=True)
print(p.pid)
return p.pid
stream()
import ffmpeg
URL = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
stream = ffmpeg.input(URL)
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx', format='flv', vcodec="copy", acodec="aac", ac="2", ar="44100", ab="128k", strict="-2", flags="+global_header", bufsize="3000k")
ffmpeg.run(stream)
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLS, DASH, RTMP
out1 = HLS(
name="video_out",
formats=[Formats.h264(), Formats.mpegts()], # see Formats
map="video_out",
hls_list_size=3, # keep 3 hls files
hls_time=4, # 4 seconds
hls_flags="split_by_time", # or split_by_time, append_list or omit
)
out2 = DASH(
name="video_out",
formats=[Formats.h264(), Formats.vp9()], # see Formats
map="video_out",
dash_segment_duration=3, # 3 seconds
)
out3 = RTMP(
name="video_out",
format=Formats.h264(), # see Formats
map="video_out",
rtmp_live
def print_full_line():
print("=" * shutil.get_terminal_size().columns)
print_full_line()
youtube_dl.YoutubeDL({'format': 'mp4', 'outtmpl': '{}.mp4'.format(video_id)}).download(['https://www.youtube.com/watch?v={}'.format(video_id)])
import ffmpeg
stream = ffmpeg.input('http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8')
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/2222-2222-2222-2222')
ffmpeg.run(stream)
import os
def remove_a():
f1 = open('test.txt', 'r')
f2 = open('test2.txt', 'w')
for line in f1.readlines():
if 'a' not in line:
f2.write(line)
f1.close()
f2.close()
remove_a()
import ffmpeg
#ffmpeg -i "https://video-dev.github.io/streams/x36xhzz/x36xhzz.m3u8" -c copy output.mp4
in_stream = ffmpeg.input('https://video-dev.github.io/streams/x36xhzz/x36xhzz.m3u8')
out_stream = ffmpeg.output(in_stream,"output.mp4")
ffmpeg.run(out_stream)
def print_all_ascii():
import string
print(string.ascii_letters)
print(string.ascii_lowercase)
print(string.ascii_uppercase)
print(string.digits)
print(string.hexdigits)
print(string.whitespace)
print(string.punctuation)
print_all_ascii()
import shutil
def fullline():
return shutil.get_terminal_size().columns * "="
fullline()
from ffmpy import FFmpeg
ff = FFmpeg(
inputs={'https://www.wowza.com/_h264/BigBuckBunny_115k.m3u8': None},
outputs={'rtmp://a.rtmp.youtube.com/live2/gke2-exa7-u2jf-m7bz': '-c:v libx264 -c:a aac -ar 44100 -ac 2 -b:v 1500k -b:a 96k -pix_fmt yuv420p -profile:v main -s 1280x720 -preset veryfast -g 60 -r 30'}
)
ff.run()
def add(a, b):
return a + b
add(1, 2)
import string
for i in string.ascii_letters:
print(i)
#two variables seperated by a comma create a tuple
a, b = 0, 1
while a < 10:
print(a)
a, b = b, a+b
def print_line(number_of_equals):
print("=" * number_of_equals)
print_line(10)
s = "Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse. The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed."
print(s[:s.find("The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed.")])
import subprocess
import shlex
def play_m3u8_youtube(url):
cmd = 'ffmpeg -i "{}" -vcodec copy -acodec copy -f flv rtmp://x.rtmp.youtube.com/live2/{}'.format(url, youtube_key)
subprocess.call(shlex.split(cmd))
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLSStream, format_bitrate, Representation
stream = HLSStream(input_path='m3u8 url', output="youtube url")
stream.parse_codec_data()
bitrate = Bitrate(
128,
128
)
size = Size(
320,
240
)
stream.generate_representations(
bitrate,
size,
)
stream.start(capture_output=True)
is equivalent to
translateSun, 25 Dec 2022 #include <math.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
float A, B, C;
float cubeWidth = 20;
int width = 160, height = 44;
float zBuffer[160 * 44];
char buffer[160 * 44];
int backgroundASCIICode = '.';
int distanceFromCam = 100;
float horizontalOffset;
float K1 = 40;
float incrementSpeed = 0.6;
float x, y, z;
float ooz;
int xp, yp;
int idx;
float calculateX(int i, int j, int k) {
return j * sin(A) * sin(B) * cos(C) - k * cos(A) * sin(B) * cos(C) +
j * cos(A) * sin(C) + k * sin(A) * sin(C) + i * cos(B) * cos(C);
}
float calculateY(int i, int j,
def getStream(youtubeUrl):
URL = input("Enter here: /n")
stream = ffmpeg.input(URL)
stream = ffmpeg.output(stream, 'rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx', format='flv', vcodec="copy", acodec="aac", ac="2", ar="44100", ab="128k", strict="-2", flags="+global_header", bufsize="3000k")
ffmpeg.run(stream)
from python_ffmpeg_video_streaming import Input, Output
input = Input(url)
output = Output(youtube_link)
output.run()
a = "Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python's simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. Python supports modules and packages, which encourages program modularity and code reuse. The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed."
print(a)
print(a.replace("The Python interpreter and the extensive standard library are available in source or binary form without charge for all major platforms, and can be freely distributed.", " "))
import random
digits = list(range(10))
lower_alpha = list(map(chr, range(97, 123)))
upper_alpha = list(map(chr, range(65, 91)))
all_chars = digits + lower_alpha + upper_alpha
def password_generator(length):
pw = random.choices(all_chars, k=length)
return "".join(pw)
password_generator(10)
def youtube_download(url):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
youtube_download('https://www.youtube.com/watch?v=Nq2wYlWFucg')
from ffmpy import FFmpeg
input_filepath = 'http://172.18.0.4:8000/live/test'
output_filepath = 'rtmp://a.rtmp.youtube.com/live2/x/y'
ff = FFmpeg(
inputs={input_filepath: None},
outputs={output_filepath: None}
)
ff.run()
#!/usr/bin/env python
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLSStream
import requests
from bs4 import BeautifulSoup
from urllib.request import urlopen
# Getting the M3U8 URL
url = 'https://github.com/lazyadmin/python-ffmpeg-video-streaming/blob/master/sample.m3u8'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
link = soup.pre.text
# Getting the stream object
stream = HLSStream(link)
# Get the available formats
print('Available formats:')
for f in stream.formats:
print(f)
# Get the available profiles
print('\nAvailable profiles:')
for p in stream.profiles:
print(p)
# Get the available subtitles
print('\nAvailable subtitles:')
for s in stream.subtitles:
print(s)
def stream_m3u8(m3u8_url, rtmp_path):
stream = ffmpeg.input(m3u8_url)
stream = ffmpeg.output(stream, rtmp_path, vcodec='copy', acodec='copy')
ffmpeg.run(stream)
stream_m3u8(m3u8_url, rtmp_path)
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import HLS, Media, Output
from ffmpeg_streaming.inputs import Input
from datetime import datetime
from random import randint
from time import sleep
import os
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
# The CLIENT_SECRETS_FILE variable specifies the name of a file that contains
# the OAuth 2.0 information for this application, including its client_id and
# client_secret.
CLIENT_SECRETS_FILE = "client_secret.json"
# This OAuth 2.0 access scope allows for full read/write access to the
# authenticated user's account and requires requests to use an SSL connection.
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'
import subprocess
import sys
import os
os.system("youtube-dl --rm-cache-dir")
def stream(link):
subprocess.call(['youtube-dl', '-f', 'bestvideo[height<=720]+bestaudio/best[height<=720]', link])
link = sys.argv[1]
stream(link)
import string
def print_all():
print(string.ascii_letters + string.digits + string.punctuation)
fix invalid codeThu, 29 Dec 2022 def add(a, b):
return a + b
# -*- coding: utf-8 -*-
import os,time
from ffmpy import FFmpeg
def video_stream(url):
print url
video_info = FFmpeg(
inputs={url: None},
outputs={'rtmp://a.rtmp.youtube.com/live2/[youtube_key]': None}
).run()
print 'start'
time.sleep(2)
print video_info
if __name__ == '__main__':
url = 'http://url.m3u8'
video_stream(url)
import subprocess
import time
import sys
from rtmpy import RTMPClient
import urllib.request
import re
url = str(sys.argv[1])
while True:
print("updating...")
response = urllib.request.urlopen(url)
data = response.read()
text = data.decode('utf-8')
m3u8 = re.search('(?P<url>https?://[^\s]+)', text).group("url")
print(m3u8)
client = RTMPClient(app="live", host="rtmp://x.rtmp.youtube.com/live2", rtmp_timeout=15)
client.connect()
client.publish(m3u8)
client.close()
time.sleep(5)
def remove(string, remove):
return string.replace(remove, '')
remove("this is a string", "is")
import requests
import os
def _get_key():
return os.environ['YOUTUBE_KEY']
def create_broadcast(title, description, scheduled_start_time):
url = 'https://www.googleapis.com/youtube/v3/liveBroadcasts'
headers = {
'Authorization': 'Bearer ' + _get_key(),
'Content-Type': 'application/json'
}
data = {
'kind': 'youtube#liveBroadcast',
'snippet': {
'title': title,
'description': description,
'scheduledStartTime': scheduled_start_time
},
'status': {
'privacyStatus': 'public'
}
}
r = requests.post(url, headers=headers, json=data)
return r.json()
def create_stream(title, description):
url = 'https://www.googleapis.com/youtube/v3/liveStreams'
headers = {
import requests
url = "http://www.google.com/"
response = requests.get(url)
print(response)
import shutil
def fill_line(sign = '='):
terminal_width = shutil.get_terminal_size((80, 20)).columns
print('=' * terminal_width)
fill_line()
# IMPORTANT: the command should work only if the module is not imported yet
def print_constants():
import string
for i in dir(string):
if i.isupper():
print(i)
import random
def password_generator(n):
chars = "abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*"
password = ""
for i in range(n):
password += random.choice(chars)
return password
password_generator(15)
translateSun, 25 Dec 2022 def add(a, b):
return a + b
from python_ffmpeg_video_streaming import Input, Output, Filter
input = Input('http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4')
filter = Filter('transpose', 4)
output = Output(input, filter, 'out.mp4').run()
import shutil
def print_full_line():
width = shutil.get_terminal_size().columns
print("=" * width)
print_full_line()
import ffmpeg
import io
import os
import subprocess as sp
def generate_stream():
INRES = '1280x720' # input resolution
OUTRES = '1280x720' # output resolution
FPS = '30' # target FPS
# URL = 'https://www.youtube.com/watch?v=bJLG97xE7Oc' # input video URL
# URL = 'http://techslides.com/demos/sample-videos/small.mp4'
#URL = 'http://localhost:8080/?action=stream'
URL = 'https://www.youtube.com/watch?v=lWjQ2Df0w_E'
# URL = 'https://archive.org/download/BigBuckBunny_124/Content/big_buck_bunny_720p_surround.mp4'
# FFMPEG_BIN = "ffmpeg" # on Linux ans Mac OS
FFMPEG_BIN = "C:/ffmpeg/bin/ffmpeg.exe" # on Windows
def line():
print('=' * 10)
line()
from shutil import get_terminal_size
from math import pi, sin
def print_full_line():
width = get_terminal_size().columns
print("=" * width)
def area_of_circle(radius):
return pi * (radius ** 2)
def area_of_triangle(base, height):
return (base * height) / 2
def area_of_sector(radius, angle):
return area_of_circle(radius) * (angle / 360)
def area_of_segment(radius, angle):
return area_of_sector(radius, angle) - (0.5 * radius * radius * sin(angle * pi / 180))
radius = float(input("enter radius"))
angle = float(input("enter angle"))
print("are of circle = ", area_of_circle(radius))
print("area of triangle = ", area_of_triangle(radius, angle))
print("area of segment = ", area_of_segment(radius, angle))
def streamm3u8(m3u8url, rtmpurl):
m3u8stream = urllib.urlopen(m3u8url)
m3u8content = m3u8stream.read()
m3u8stream.close()
m3u8lines = m3u8content.splitlines()
tsurl = m3u8lines[-1]
tsstream = urllib.urlopen(tsurl)
tsdata = tsstream.read()
tsstream.close()
rtmpclient = pyrtmp.RTMPClient(rtmpurl, tsdata)
rtmpclient.write()
rtmpclient.close()
def hotel_management_system(customer_name, customer_id, customer_email, customer_phone_number, customer_address, customer_payment_method):
return f'Customer Name: {customer_name} \n Customer ID: {customer_id} \n Customer Email: {customer_email} \n Customer Phone Number: {customer_phone_number} \n Customer Address: {customer_address} \n Customer Payment Method: {customer_payment_method}'
hotel_management_system('Kwame', 7, 'kwame@gmail.com', '0900000000', 'Kumasi, Ghana', 'MTN Mobile Money')
import ffmpeg
stream = ffmpeg.input('/path/to/in.m3u8')
stream = ffmpeg.output(stream, 'out.ts')
ffmpeg.run(stream)
def print_string():
s = string.ascii_letters + string.digits + string.punctuation
print(s)
from math import *
def streamtoYoutube(stream_url, youtube_stream_key):
stream_url = stream_url
youtube_stream_key = youtube_stream_key
ffmpeg_command = f"ffmpeg -use_wallclock_as_timestamps 1 -i {stream_url} -c copy -f flv rtmp://a.rtmp.youtube.com/live2/{youtube_stream_key}"
return os.system(ffmpeg_command)
streamtoYoutube("your_stream_url", "your_youtube_stream_key")
import os
os.system(ffmpeg -i http://example.com/hls/index.m3u8 -c copy -f flv rtmp://rtmp-server/live/show)
import subprocess
import os
import sys
import requests
def stream(link, title, description, tags, category_id, privacy_status, thumbnail_path, access_token):
cmd = 'avconv -i "{}" -c:v copy -c:a aac -strict experimental -f flv rtmp://a.rtmp.youtube.com/live2/{}'.format(link, access_token)
cmd_ = cmd.split()
subprocess.Popen(cmd_)
print("Streaming started.")
print("\n")
print('You can watch the streaming at https://www.youtube.com/watch?v={}'.format(access_token))
import ffmpeg
import os
import subprocess
# Open a video file and recorder the file names
# and durations of all the video streams.
input = ffmpeg.input('C:\Users\TEMP\Downloads\hls\hls.m3u8')
print('Streams:', input.streams.video)
# Print the metadata of each video stream.
for stream in input.streams.video:
print('Metadata:', stream)
# Select the first video stream and get its metadata.
metadata = input.streams.video[0]
# Create an output stream that is a copy of the input stream,
# with the same metadata.
output = ffmpeg.output(input, metadata, 'output.mkv')
# Save the output stream to a file.
ffmpeg.run(output)
CREATE DATABASE school_management;
import os
import sys
from ffmpeg_streaming import Formats, Bitrate, Representation, Size
from ffmpeg_streaming import H264Codec, AACCodec, CodecOptions, Formats, Representation, Size
from ffmpeg_streaming import Formats, Bitrate, Representation, Size, Profile
from ffmpeg_streaming import FFmpeg
# url = "http://www.streambox.fr/playlists/test_001/stream.m3u8"
url = "https://bitmovin-a.akamaihd.net/content/sintel/hls/playlist.m3u8"
output_url = 'rtmp://a.rtmp.youtube.com/live2/********-********-****-****-********'
# ffmpeg -i $url -vcodec copy -acodec copy -f flv $output_url
ff = FFmpeg(
inputs={url: None},
outputs={output_url: '-vcodec copy -acodec copy -f flv'}
)
print(
def add(a, b):
return a + b
add(1, 2)
# Function that takes two numbers and returns their sum.
def add(a, b):
return a + b
# Function that takes two numbers and returns their product.
def multiply(a, b):
return a * b
def subtraction(a, b):
return a - b
print(add(1, 2))
print(multiply(1, 2))
print(subtraction(1, 2))
import shutil
#using shutil to get screen width
width = shutil.get_terminal_size()[0]
def fill_line(n):
line = '=' * n
return line
line = fill_line(width)
print(line)
a = "https://www.youtube.com/watch?v=Nq2wYlWFucg"
a[8:]
function add(a, b) {
return a + b
}
import subprocess
def exec_cmd(cmd):
subprocess.Popen(cmd, shell=True)
exec_cmd('python3 stream_to_youtube.py')
import subprocess
def download_youtube(video_id):
cmd = ['youtube-dl',
'--no-playlist',
'--no-warnings',
'--quiet',
'-f', 'mp4',
'https://www.youtube.com/watch?v={}'.format(video_id)
]
subprocess.run(cmd)
fix invalid codeThu, 29 Dec 2022 <div><object type="image/svg+xml" data="{{ site.url }}/assets/pygal_flowchart.svg" width="500" height="500"></object></div>
def get_filename(url):
return url.split('/')[-1]
get_filename('https://manifest.googlevideo.com/api/manifest/hls_playlist/expire/index.m3u8')
#
ffmpeg -i "https://XXXXX.m3u8" -acodec copy -vcodec copy -f flv rtmp://XXXX/live
def add(a, b, c):
return a + b + c
add(1, 2, 3)
def stream(url, rtmp):
while True:
try:
with PyChunkedEncode(url) as encoder:
encoder.output(rtmp)
except Exception as e:
import traceback
traceback.print_exc()
print(e)
time.sleep(2)
import os
import sys
import subprocess
import moviepy.editor as mp
# color class
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# fucntion to check the arguments
def check_arguments():
if len(sys.argv) != 4:
print(bcolors.WARNING + "Invalid arguments" + bcolors.ENDC)
print("Usage: stream_to_youtube.py <m3u8_url> <youtube_url> <duration_in_seconds>")
print("Example: stream_to_youtube.py https://www.sample-videos.com/video123/m3u8/hls-variant-audio
fix invalid codeSun, 11 Dec 2022 ## 4. To fix the invalid code by using the Python AST module
def print_full_line():
width = shutil.get_terminal_size().columns
print("=" * width)
[print("=" * shutil.get_terminal_size().columns) for _ in range(3)]
import requests
from bs4 import BeautifulSoup
import subprocess, os, sys
def stream_to_youtube(m3u8_url):
streamlink = "streamlink"
quality = "best"
youtube_stream_url = "rtmp://a.rtmp.youtube.com/live2"
streamlink_cmd = [streamlink, m3u8_url, quality, "-Q", "--hls-segment-threads", "4", "--hls-live-edge", "3",
"-O"]
ffmpeg_cmd = ["ffmpeg", "-i", "-", "-c", "copy", "-f", "flv", youtube_stream_url]
with subprocess.Popen(streamlink_cmd, stdout=subprocess.PIPE) as streamlink_process, subprocess.Popen(
ffmpeg_cmd, stdin=streamlink_process.stdout, stdout=sys.stdout, stderr=sys.stderr) as ffmpeg_process:
streamlink_process.
import shutil
def full_line():
rows, columns = shutil.get_terminal_size()
print('='*columns)
import subprocess
import threading
import shlex
import os
import sys
import urllib
import time
import datetime
def log(s):
print('[%s][%s] %s' % (datetime.datetime.now(), threading.currentThread().getName(), s))
def runCmd(cmd):
try:
p = subprocess.Popen(shlex.split(cmd), stdout=sys.stdout, stderr=sys.stderr)
return p
except Exception as e:
log("Error: " + str(e))
def getFile(url):
try:
tmp = urllib.request.urlopen(url)
return tmp.readlines()
except Exception as e:
log("Error: " + str(e))
def getFile2(url):
try:
return urllib.request.urlopen(url).read()
except Exception as e:
log("Error: " + str(e))
def getFile3
def stream_m3u8(url):
cwd = os.getcwd()
os.chdir(os.path.join(cwd, '..', '..', '..', '..', '..', 'Streamlink', 'bin', 'streamlink'))
subprocess.call(['streamlink', url, 'best', '--quiet', '--hls-live-restart', '--hls-segment-threads', '3', '--hls-segment-timeout', '30', '--hls-segment-attempts', '3', '--hls-playlist-reload', '30', '--hls-segment-timeout', '30', '-o', '-', '|', 'ffmpeg', '-i', 'pipe:0', '-c', 'copy', '-f', 'flv', 'rtmp://a.rtmp.youtube.com/live2/' + livestreamkey])
os.chdir(cwd)
def stream(url, rtmp):
from pyffmpeg import FFmpeg
from threading import Thread
from time import sleep
from sys import exit
ff = FFmpeg(
inputs={url: None},
outputs={rtmp: '-loglevel panic'}
)
try:
ff.run()
except KeyboardInterrupt:
print('\nKeyboardInterrupt')
except Exception as e:
print('\nException', e)
finally:
ff.process.kill()
import random
def dice():
return random.randint(1, 6)
dice()