话不多说, 直接上代码, 代码有注释, 解释得很清楚

这一个模块是基本介绍. 下一个模块是多线程, 多进程调用, 让多个声道同时播放不同音乐

模块名: sounddevice_instructions.py

import os
import sys
import time
import wave
import numpy as np
import array
import sounddevice as sd
from scipy.io import wavfile
import soundfile

class MyException(Exception):
    """
    自定义的异常类
    """
    def __init__(self, *args):
        self.args = args

def preliminary_instruction():
    """
    初步介绍 sounddevice 查询声卡相关操作
    :return:
    """
    # 首先获取与当前主机连接的声卡设备驱动信息
    drivers_tuple = sd.query_hostapis()
    print(drivers_tuple)  # 返回一个包含声卡驱动信息的元组, 元组的每个元素, 是一个个字典, 包含了每个驱动的详细信息

    for driver_msg_dict in drivers_tuple:
        # 能够获取每个驱动的名字
        print(driver_msg_dict['name'], end=", ")  # MME, Windows DirectSound, ASIO, Windows WASAPI, Windows WDM-KS,

    # 查询当前主机能用的声卡声道
    devices_list = sd.query_devices()  # 返回一个列表

    # 每个设备信息, 以字典形式呈现
    for device_msg_dict in devices_list:
        print(device_msg_dict)

# 下面两个函数, 是根据声卡声道名字, 获取声卡声道名称 及 id, 区分 输出 和 输入 两种声道
def get_input_device_id_by_name(channel_name):
    """
    功能: 根据声卡声道名字, 获取 输入 声道 id
    :return: 返回输入声道id
    """

    devices_list = sd.query_devices()
    for index, device_msg_dict in enumerate(devices_list):
        if channel_name == device_msg_dict["name"] and device_msg_dict["max_input_channels"] > 0:
            return index
    else:
        raise MyException("找不到该设备!!!")

def get_output_device_id_by_name(channel_name):
    """
    功能: 根据声卡声道名字, 获取 输出 声道 id
    :return: 返回输出声道id
    """

    devices_list = sd.query_devices()
    for index, device_msg_dict in enumerate(devices_list):
        if channel_name == device_msg_dict["name"] and device_msg_dict["max_output_channels"] > 0:
            return index
    else:
        raise MyException("找不到该设备!!!")

def get_audio_devices_all_msg_dict():
    audio_drivers_and_channels_msg_dict = {}
    audio_input_channels_msg_dict = {}
    audio_output_channels_msg_dict = {}

    # 使用sounddevice 获取电脑连接的声卡以及系统自带的所有音频驱动信息(驱动, 声道名, id)
    this_tmp_dict = {}
    host_api_tuple = sd.query_hostapis()

    for temp_dict in host_api_tuple:
        this_tmp_dict[temp_dict["name"]] = temp_dict["devices"]

    channels_list = sd.query_devices()
    for driver_name in this_tmp_dict:
        audio_drivers_and_channels_msg_dict[driver_name] = []
        audio_input_channels_msg_dict[driver_name] = []
        audio_output_channels_msg_dict[driver_name] = []

        for id in this_tmp_dict[driver_name]:
            audio_drivers_and_channels_msg_dict[driver_name].append((id, channels_list[id]["name"]))

            if channels_list[id]["max_input_channels"] > 0:
                audi

def read_data(audio_file_path, audio_channels):
    # wav格式文件与 raw(pcm) 格式区分开, 不同格式, 获取其数据内容的方式不一样
    if audio_file_path.endswith(".wav"):
        data_array, sample_rate = soundfile.read(audio_file_path)
        return data_array

    elif audio_file_path.endswith(".pcm") or audio_file_path.endswith(".raw"):
        # 打开一个音频文件, 以 raw(pcm) 格式为例
        data_array = array.array('h')
        with open(audio_file_path, "rb") as f:
            data_array.frombytes(f.read())

        # 根据声道数, 来进行切分(我用的音频是双声道)
        data_array = data_array[::audio_channels]  # 有几个声道, 切分的步长就是几, 这样就能单独切出来一个声道
        return data_array

def play_audio_file(audio_file_path, channel_id, audio_channels, sample_rate):

    # 将本机默认的输出声道, 改为 自己设定的声道
    # sd.default.device 是一个列表, 第一个元素是: 默认的输入设备id;    第二个是默认的输出设备id
    sd.default.device[1] = channel_id

    # 常选参数, 一个数据, 一个采样率, 另外还有一个: blocking=True, 若设置, 则表示播放完毕当前音频再往下进行程序
    data_array = read_data(audio_file_path, audio_channels)
    sd.play(data_array, sample_rate)
    sd.wait()  # 表示等到此音频文件播放完毕之后再往下进行程序
    # time.sleep(20)  # 使用 time.sleep() ---> 休眠几秒, 音频文件就播放几秒, 时长自己控制

    # 注: 如果没有 类似休眠 等延时操作, 则程序只会一闪而过, 不会播放音频

# 使用 sounddevice_example 录制音频, 提示也可以用多进程
def do_record(channel_id, file_path):
    # 首先设置默认录音声道id, id不同, 调用的录音声卡也会不同, 和 播放一样, 也支持 多进程+多线程, 多个声道同时录音
    sd.default.device[0] = channel_id

    # 再调用函数录音
    sample_rate = 44100  # 音频采样率
    length = 10  # 时长, 单位秒
    record_data = sd.rec(frames=length*sample_rate, samplerate=sample_rate, channels=1, blocking=True)  # blocking=True, 能够让录音直到时长

    wavfile.write(file_path, sample_rate, record_data)

# 边录边播
def play_and_record(input_channel_id, output_channel_id, play_audio_file_path, rec_file_path, play_audio_channels=1,
                    play_audio_fs=44100, rec_file_channels=1):

    # 首先设置默认输出和输入声道
    sd.default.device[0] = input_channel_id
    sd.default.device[1] = output_channel_id

    # 开始边录边播
    data_array = read_data(play_audio_file_path, play_audio_channels)
    rec_data = sd.playrec(data=data_array, samplerate=play_audio_fs, channels=rec_file_channels, blocking=True)

    # 存储录音文件
    wavfile.write(rec_file_path, play_audio_fs, rec_data)

if __name__ == "__main__":
    preliminary_instruction()
    # file_path = r"F:\CloudMusic\download\FIRBetterLife.raw"
    file_path = r"F:\CloudMusic\download\EpicScoreFireHead.raw"
    output_id = get_output_device_id_by_name("喇叭/耳机 (Realtek High Definition Audio(SST))")
    rec_file_path = os.getcwd() + "\\test_record.wav"
    input_id = get_input_device_id_by_name("麦克风阵列 (Realtek High Definition Audio(SST))")

    # play_audio_file(rec_file_path, output_id, 1, 44100)  # 只播放
    # do_record(channel_id, file_path)  # 只录音

    # play_and_record(input_id, output_id, file_path, rec_file_path, play_audio_channels=2)  # 边录边播

    # file_path = sys.argv[1]
    # output_id = int(sys.argv[2])
    # audio_channels = int(sys.argv[3])
    # fs = int(sys.argv[4])
    # play_audio_file(file_path, output_id, audio_channels, fs)  # 接收shell参数, 多进程多声道播放音乐
下面这个模块是多进程, 多线程调用, 调用的是上面那个模块

模块名: multi_play_audio.py

import subprocess
from python_audio_packages.sounddevice_instructions import get_output_device_id_by_name
import threading

"""
    注: 只有用多进程, 才能实现多声道同时播放不同音频, 一个进程 调用一个 sounddevice , 占用一个声卡声道;
    调用这几个 子进程的时候, 可以用 多线程: 原因--->可以用在程序代码内部; 
    而 multiprocessing.Process 只能用在 if __name__ == "__main__": 之后(Windows系统下是这样)
"""

channel1 = "喇叭/耳机 (Realtek High Definit"
channel2 = "喇叭/耳机 (Realtek High Definition Audio(SST))"

channel_id_1 = str(get_output_device_id_by_name(channel1))
channel_id_2 = str(get_output_device_id_by_name(channel2))

audio_file1 = r"F:\CloudMusic\download\FIRBetterLife.raw"
audio_file2 = r"F:\CloudMusic\download\EpicScoreFireHead.raw"

cmd1 = "python sounddevice_instructions.py " + audio_file1 + " " + channel_id_1 + " 2 44100"
cmd2 = "python sounddevice_instructions.py " + audio_file2 + " " + channel_id_2 + " 2 44100"

# 使用多进程, 同时使用两个声道, 播放不同音频
def multi_channels_play1():
    subprocess.run(cmd1, shell=True)

def multi_channels_play2():
    subprocess.run(cmd2, shell=True)

if __name__ == "__main__":
    t1 = threading.Thread(target=multi_channels_play1)
    t2 = threading.Thread(target=multi_channels_play2)

    t1.start()
    t2.start()
最后修改:2020 年 08 月 17 日
如果觉得我的文章对你有用,请随意赞赏