diart.blocks#

Submodules#

Package Contents#

Classes#

AggregationStrategy

Abstract class representing a strategy to aggregate overlapping buffers

HammingWeightedAverageStrategy

Compute the average weighted by the corresponding Hamming-window aligned to each buffer

AverageStrategy

Compute a simple average over the focus region

FirstOnlyStrategy

Instead of aggregating, keep the first focus region in the buffer list

DelayedAggregation

Aggregate aligned overlapping windows of the same duration

OnlineSpeakerClustering

Implements constrained incremental online clustering of speakers and manages cluster centers.

SpeakerEmbedding

OverlappedSpeechPenalty

Applies a penalty on overlapping speech and low-confidence regions to speaker segmentation scores.

EmbeddingNormalization

OverlapAwareSpeakerEmbedding

Extract overlap-aware speaker embeddings given an audio chunk and its segmentation.

SpeakerSegmentation

SpeakerDiarization

Represents a streaming audio pipeline

SpeakerDiarizationConfig

Configuration containing the required

PipelineConfig

Configuration containing the required

Pipeline

Represents a streaming audio pipeline

Binarize

Transform a speaker segmentation from the discrete-time domain

Resample

Dynamically resample audio chunks.

AdjustVolume

Change the volume of an audio chunk.

VoiceActivityDetection

Represents a streaming audio pipeline

VoiceActivityDetectionConfig

Configuration containing the required

class diart.blocks.AggregationStrategy(cropping_mode='loose')#

Bases: abc.ABC

Abstract class representing a strategy to aggregate overlapping buffers

Parameters:

cropping_mode (("strict", "loose", "center"), optional) – Defines the mode to crop buffer chunks as in pyannote.core. See https://pyannote.github.io/pyannote-core/reference.html#pyannote.core.SlidingWindowFeature.crop Defaults to “loose”.

static build(name, cropping_mode='loose')#

Build an AggregationStrategy instance based on its name

Parameters:
  • name (typing_extensions.Literal[mean, hamming, first]) –

  • cropping_mode (typing_extensions.Literal[strict, loose, center]) –

Return type:

AggregationStrategy

__call__(buffers, focus)#

Aggregate chunks over a specific region.

Parameters:
  • buffers (list of SlidingWindowFeature, shapes (frames, speakers)) – Buffers to aggregate

  • focus (Segment) – Region to aggregate that is shared among the buffers

Returns:

aggregation – Aggregated values over the focus region

Return type:

SlidingWindowFeature, shape (cropped_frames, speakers)

abstract aggregate(buffers, focus)#
Parameters:
  • buffers (List[pyannote.core.SlidingWindowFeature]) –

  • focus (pyannote.core.Segment) –

Return type:

numpy.ndarray

class diart.blocks.HammingWeightedAverageStrategy(cropping_mode='loose')#

Bases: AggregationStrategy

Compute the average weighted by the corresponding Hamming-window aligned to each buffer

Parameters:

cropping_mode (typing_extensions.Literal[strict, loose, center]) –

aggregate(buffers, focus)#
Parameters:
  • buffers (List[pyannote.core.SlidingWindowFeature]) –

  • focus (pyannote.core.Segment) –

Return type:

numpy.ndarray

class diart.blocks.AverageStrategy(cropping_mode='loose')#

Bases: AggregationStrategy

Compute a simple average over the focus region

Parameters:

cropping_mode (typing_extensions.Literal[strict, loose, center]) –

aggregate(buffers, focus)#
Parameters:
  • buffers (List[pyannote.core.SlidingWindowFeature]) –

  • focus (pyannote.core.Segment) –

Return type:

numpy.ndarray

class diart.blocks.FirstOnlyStrategy(cropping_mode='loose')#

Bases: AggregationStrategy

Instead of aggregating, keep the first focus region in the buffer list

Parameters:

cropping_mode (typing_extensions.Literal[strict, loose, center]) –

aggregate(buffers, focus)#
Parameters:
  • buffers (List[pyannote.core.SlidingWindowFeature]) –

  • focus (pyannote.core.Segment) –

Return type:

numpy.ndarray

class diart.blocks.DelayedAggregation(step, latency=None, strategy='hamming', cropping_mode='loose')#

Aggregate aligned overlapping windows of the same duration across sliding buffers with a specific step and latency.

Parameters:
  • step (float) – Shift between two consecutive buffers, in seconds.

  • latency (float, optional) – Desired latency, in seconds. Defaults to step. The higher the latency, the more overlapping windows to aggregate.

  • strategy (("mean", "hamming", "first"), optional) – Specifies how to aggregate overlapping windows. Defaults to “hamming”. “mean”: simple average “hamming”: average weighted by the Hamming window values (aligned to the buffer) “first”: no aggregation, pick the first overlapping window

  • cropping_mode (("strict", "loose", "center"), optional) – Defines the mode to crop buffer chunks as in pyannote.core. See https://pyannote.github.io/pyannote-core/reference.html#pyannote.core.SlidingWindowFeature.crop Defaults to “loose”.

Example

>>> duration = 5
>>> frames = 500
>>> step = 0.5
>>> speakers = 2
>>> start_time = 10
>>> resolution = duration / frames
>>> dagg = DelayedAggregation(step=step, latency=2, strategy="mean")
>>> buffers = [
>>>     SlidingWindowFeature(
>>>         np.random.rand(frames, speakers),
>>>         SlidingWindow(start=(i + start_time) * step, duration=resolution, step=resolution)
>>>     )
>>>     for i in range(dagg.num_overlapping_windows)
>>> ]
>>> dagg.num_overlapping_windows
... 4
>>> dagg(buffers).data.shape
... (51, 2)  # Rounding errors are possible when cropping the buffers
_prepend(output_window, output_region, buffers)#
Parameters:
  • output_window (pyannote.core.SlidingWindowFeature) –

  • output_region (pyannote.core.Segment) –

  • buffers (List[pyannote.core.SlidingWindowFeature]) –

__call__(buffers)#
Parameters:

buffers (List[pyannote.core.SlidingWindowFeature]) –

Return type:

pyannote.core.SlidingWindowFeature

class diart.blocks.OnlineSpeakerClustering(tau_active, rho_update, delta_new, metric='cosine', max_speakers=20)#

Implements constrained incremental online clustering of speakers and manages cluster centers.

Parameters:
  • tau_active (float) – Threshold for detecting active speakers. This threshold is applied on the maximum value of per-speaker output activation of the local segmentation model.

  • rho_update (float) – Threshold for considering the extracted embedding when updating the centroid of the local speaker. The centroid to which a local speaker is mapped is only updated if the ratio of speech/chunk duration of a given local speaker is greater than this threshold.

  • delta_new (float) – Threshold on the distance between a speaker embedding and a centroid. If the distance between a local speaker and all centroids is larger than delta_new, then a new centroid is created for the current speaker.

  • metric (str. Defaults to "cosine".) – The distance metric to use.

  • max_speakers (int) – Maximum number of global speakers to track through a conversation. Defaults to 20.

property num_free_centers: int#
Return type:

int

property num_known_speakers: int#
Return type:

int

property num_blocked_speakers: int#
Return type:

int

property inactive_centers: List[int]#
Return type:

List[int]

get_next_center_position()#
Return type:

Optional[int]

init_centers(dimension)#

Initializes the speaker centroid matrix

Parameters:

dimension (int) – Dimension of embeddings used for representing a speaker.

update(assignments, embeddings)#

Updates the speaker centroids given a list of assignments and local speaker embeddings

Parameters:
  • assignments (Iterable[Tuple[int, int]])) – An iterable of tuples with two elements having the first element as the source speaker and the second element as the target speaker.

  • embeddings (np.ndarray, shape (local_speakers, embedding_dim)) – Matrix containing embeddings for all local speakers.

add_center(embedding)#

Add a new speaker centroid initialized to a given embedding

Parameters:

embedding (np.ndarray) – Embedding vector of some local speaker

Returns:

center_index – Index of the created center

Return type:

int

identify(segmentation, embeddings)#

Identify the centroids to which the input speaker embeddings belong.

Parameters:
  • segmentation (np.ndarray, shape (frames, local_speakers)) – Matrix of segmentation outputs

  • embeddings (np.ndarray, shape (local_speakers, embedding_dim)) – Matrix of embeddings

Returns:

speaker_map – A mapping from local speakers to global speakers.

Return type:

SpeakerMap

__call__(segmentation, embeddings)#
Parameters:
  • segmentation (pyannote.core.SlidingWindowFeature) –

  • embeddings (torch.Tensor) –

Return type:

pyannote.core.SlidingWindowFeature

class diart.blocks.SpeakerEmbedding(model, device=None)#
Parameters:
static from_pretrained(model, use_hf_token=True, device=None)#
Parameters:
  • use_hf_token (Union[Text, bool, None]) –

  • device (Optional[torch.device]) –

Return type:

SpeakerEmbedding

__call__(waveform, weights=None)#

Calculate speaker embeddings of input audio. If weights are given, calculate many speaker embeddings from the same waveform.

Parameters:
  • waveform (TemporalFeatures, shape (samples, channels) or (batch, samples, channels)) –

  • weights (Optional[TemporalFeatures], shape (frames, speakers) or (batch, frames, speakers)) – Per-speaker and per-frame weights. Defaults to no weights.

Returns:

embeddings – If weights are provided, the shape is (batch, speakers, embedding_dim), otherwise the shape is (batch, embedding_dim). If batch size == 1, the batch dimension is omitted.

Return type:

torch.Tensor

class diart.blocks.OverlappedSpeechPenalty(gamma=3, beta=10, normalize=False)#

Applies a penalty on overlapping speech and low-confidence regions to speaker segmentation scores.

Note

For more information, see “Overlap-Aware Low-Latency Online Speaker Diarization based on End-to-End Local Segmentation” (Section 2.2.1 Segmentation-driven speaker embedding). This block implements Equation 2.

Parameters:
  • gamma (float, optional) – Exponent to lower low-confidence predictions. Defaults to 3.

  • beta (float, optional) – Temperature parameter (actually 1/beta) to lower joint speaker activations. Defaults to 10.

  • normalize (bool, optional) – Whether to min-max normalize weights to be in the range [0, 1]. Defaults to False.

__call__(segmentation)#
Parameters:

segmentation (diart.features.TemporalFeatures) –

Return type:

diart.features.TemporalFeatures

class diart.blocks.EmbeddingNormalization(norm=1)#
Parameters:

norm (Union[float, torch.Tensor]) –

__call__(embeddings)#
Parameters:

embeddings (torch.Tensor) –

Return type:

torch.Tensor

class diart.blocks.OverlapAwareSpeakerEmbedding(model, gamma=3, beta=10, norm=1, normalize_weights=False, device=None)#

Extract overlap-aware speaker embeddings given an audio chunk and its segmentation.

Parameters:
  • model (EmbeddingModel) – A pre-trained embedding model.

  • gamma (float, optional) – Exponent to lower low-confidence predictions. Defaults to 3.

  • beta (float, optional) – Softmax’s temperature parameter (actually 1/beta) to lower joint speaker activations. Defaults to 10.

  • norm (float or torch.Tensor of shape (batch, speakers, 1) where batch is optional) – The target norm for the embeddings. It can be different for each speaker. Defaults to 1.

  • normalize_weights (bool, optional) – Whether to min-max normalize embedding weights to be in the range [0, 1].

  • device (Optional[torch.device]) – The device on which to run the embedding model. Defaults to GPU if available or CPU if not.

static from_pretrained(model, gamma=3, beta=10, norm=1, use_hf_token=True, normalize_weights=False, device=None)#
Parameters:
  • gamma (float) –

  • beta (float) –

  • norm (Union[float, torch.Tensor]) –

  • use_hf_token (Union[Text, bool, None]) –

  • normalize_weights (bool) –

  • device (Optional[torch.device]) –

__call__(waveform, segmentation)#
Parameters:
  • waveform (diart.features.TemporalFeatures) –

  • segmentation (diart.features.TemporalFeatures) –

Return type:

torch.Tensor

class diart.blocks.SpeakerSegmentation(model, device=None)#
Parameters:
static from_pretrained(model, use_hf_token=True, device=None)#
Parameters:
  • use_hf_token (Union[Text, bool, None]) –

  • device (Optional[torch.device]) –

Return type:

SpeakerSegmentation

__call__(waveform)#

Calculate the speaker segmentation of input audio.

Parameters:

waveform (TemporalFeatures, shape (samples, channels) or (batch, samples, channels)) –

Returns:

speaker_segmentation – The batch dimension is omitted if waveform is a SlidingWindowFeature.

Return type:

TemporalFeatures, shape (batch, frames, speakers)

class diart.blocks.SpeakerDiarization(config=None)#

Bases: diart.blocks.base.Pipeline

Represents a streaming audio pipeline

Parameters:

config (SpeakerDiarizationConfig | None) –

property config: SpeakerDiarizationConfig#
Return type:

SpeakerDiarizationConfig

static get_config_class()#
Return type:

type

static suggest_metric()#
Return type:

pyannote.metrics.base.BaseMetric

static hyper_parameters()#
Return type:

Sequence[diart.blocks.base.HyperParameter]

set_timestamp_shift(shift)#
Parameters:

shift (float) –

reset()#
__call__(waveforms)#

Diarize the next audio chunks of an audio stream.

Parameters:

waveforms (Sequence[SlidingWindowFeature]) – A sequence of consecutive audio chunks from an audio stream.

Returns:

Speaker diarization of each chunk alongside their corresponding audio.

Return type:

Sequence[tuple[Annotation, SlidingWindowFeature]]

class diart.blocks.SpeakerDiarizationConfig(segmentation=None, embedding=None, duration=5, step=0.5, latency=None, tau_active=0.6, rho_update=0.3, delta_new=1, gamma=3, beta=10, max_speakers=20, normalize_embedding_weights=False, device=None, sample_rate=16000, **kwargs)#

Bases: diart.blocks.base.PipelineConfig

Configuration containing the required parameters to build and run a pipeline

Parameters:
  • segmentation (diart.models.SegmentationModel | None) –

  • embedding (diart.models.EmbeddingModel | None) –

  • duration (float) –

  • step (float) –

  • latency (float | typing_extensions.Literal[max, min] | None) –

  • tau_active (float) –

  • rho_update (float) –

  • delta_new (float) –

  • gamma (float) –

  • beta (float) –

  • max_speakers (int) –

  • normalize_embedding_weights (bool) –

  • device (torch.device | None) –

  • sample_rate (int) –

property duration: float#

The duration of an input audio chunk (in seconds)

Return type:

float

property step: float#

The step between two consecutive input audio chunks (in seconds)

Return type:

float

property latency: float#

The algorithmic latency of the pipeline (in seconds). At time t of the audio stream, the pipeline will output predictions for time t - latency.

Return type:

float

property sample_rate: int#

The sample rate of the input audio stream

Return type:

int

class diart.blocks.PipelineConfig#

Bases: abc.ABC

Configuration containing the required parameters to build and run a pipeline

abstract property duration: float#

The duration of an input audio chunk (in seconds)

Return type:

float

abstract property step: float#

The step between two consecutive input audio chunks (in seconds)

Return type:

float

abstract property latency: float#

The algorithmic latency of the pipeline (in seconds). At time t of the audio stream, the pipeline will output predictions for time t - latency.

Return type:

float

abstract property sample_rate: int#

The sample rate of the input audio stream

Return type:

int

get_file_padding(filepath)#
Parameters:

filepath (diart.audio.FilePath) –

Return type:

Tuple[float, float]

class diart.blocks.Pipeline#

Bases: abc.ABC

Represents a streaming audio pipeline

abstract property config: PipelineConfig#
Return type:

PipelineConfig

abstract static get_config_class()#
Return type:

type

abstract static suggest_metric()#
Return type:

pyannote.metrics.base.BaseMetric

abstract static hyper_parameters()#
Return type:

Sequence[HyperParameter]

abstract reset()#
abstract set_timestamp_shift(shift)#
Parameters:

shift (float) –

abstract __call__(waveforms)#

Runs the next steps of the pipeline given a list of consecutive audio chunks.

Parameters:

waveforms (Sequence[SlidingWindowFeature]) – Consecutive chunk waveforms for the pipeline to ingest

Returns:

For each input waveform, a tuple containing the pipeline output and its respective audio

Return type:

Sequence[Tuple[Any, SlidingWindowFeature]]

class diart.blocks.Binarize(threshold, uri=None)#

Transform a speaker segmentation from the discrete-time domain into a continuous-time speaker segmentation.

Parameters:
  • threshold (float) – Probability threshold to determine if a speaker is active at a given frame.

  • uri (Optional[Text]) – Uri of the audio stream. Defaults to no uri.

__call__(segmentation)#

Return the continuous-time segmentation corresponding to the discrete-time input segmentation.

Parameters:

segmentation (SlidingWindowFeature) – Discrete-time speaker segmentation.

Returns:

annotation – Continuous-time speaker segmentation.

Return type:

Annotation

class diart.blocks.Resample(sample_rate, resample_rate, device=None)#

Dynamically resample audio chunks.

Parameters:
  • sample_rate (int) – Original sample rate of the input audio

  • resample_rate (int) – Sample rate of the output

  • device (Optional[torch.device]) –

__call__(waveform)#
Parameters:

waveform (diart.features.TemporalFeatures) –

Return type:

diart.features.TemporalFeatures

class diart.blocks.AdjustVolume(volume_in_db)#

Change the volume of an audio chunk.

Notice that the output volume might be different to avoid saturation.

Parameters:

volume_in_db (float) – Target volume in dB.

static get_volumes(waveforms)#

Compute the volumes of a set of audio chunks.

Parameters:

waveforms (torch.Tensor) – Audio chunks. Shape (batch, samples, channels).

Returns:

volumes – Audio chunk volumes per channel. Shape (batch, 1, channels)

Return type:

torch.Tensor

__call__(waveform)#
Parameters:

waveform (diart.features.TemporalFeatures) –

Return type:

diart.features.TemporalFeatures

class diart.blocks.VoiceActivityDetection(config=None)#

Bases: diart.blocks.base.Pipeline

Represents a streaming audio pipeline

Parameters:

config (VoiceActivityDetectionConfig | None) –

property config: diart.blocks.base.PipelineConfig#
Return type:

diart.blocks.base.PipelineConfig

static get_config_class()#
Return type:

type

static suggest_metric()#
Return type:

pyannote.metrics.base.BaseMetric

static hyper_parameters()#
Return type:

Sequence[diart.blocks.base.HyperParameter]

reset()#
set_timestamp_shift(shift)#
Parameters:

shift (float) –

__call__(waveforms)#

Runs the next steps of the pipeline given a list of consecutive audio chunks.

Parameters:

waveforms (Sequence[SlidingWindowFeature]) – Consecutive chunk waveforms for the pipeline to ingest

Returns:

For each input waveform, a tuple containing the pipeline output and its respective audio

Return type:

Sequence[Tuple[Any, SlidingWindowFeature]]

class diart.blocks.VoiceActivityDetectionConfig(segmentation=None, duration=5, step=0.5, latency=None, tau_active=0.6, device=None, sample_rate=16000, **kwargs)#

Bases: diart.blocks.base.PipelineConfig

Configuration containing the required parameters to build and run a pipeline

Parameters:
  • segmentation (diart.models.SegmentationModel | None) –

  • duration (float) –

  • step (float) –

  • latency (float | typing_extensions.Literal[max, min] | None) –

  • tau_active (float) –

  • device (torch.device | None) –

  • sample_rate (int) –

property duration: float#

The duration of an input audio chunk (in seconds)

Return type:

float

property step: float#

The step between two consecutive input audio chunks (in seconds)

Return type:

float

property latency: float#

The algorithmic latency of the pipeline (in seconds). At time t of the audio stream, the pipeline will output predictions for time t - latency.

Return type:

float

property sample_rate: int#

The sample rate of the input audio stream

Return type:

int