Brasil - Projeção de Casos e Óbitos
Projeção do número de casos e número de mortes no Brasil para a próxima semana. Gráficos com casos e óbitos novos.
Neste relatório vamos desenvolver em Python uma projeção que pode ser atualizada em tempo real do número de casos de Covid-19 no Brasil. Devido à sua natureza com crescimento exponencial, podemos fazer projeções de curto/médio prazo de acordo com tal tendência. É importante salientar que não existe nenhuma exponencial pura. Em algum momento haverá um ponto de inflexão (quando o número de casos começa a diminuir) e esta exponencial se tornará um sigmóide (como já é o caso da China). Portanto, como não há como prever quando ocorrerá essa inflexão, as projeções somente são úteis para curto e médio prazo. Sem mais delongas, vamos começar! Primeiramente, todas as importações que serão utilizadas:
#collapse
# Todas as importações vem aqui
import numpy as np
import pandas as pd;
import matplotlib.pyplot as plt
import seaborn as sns;
from sklearn.linear_model import LinearRegression
from datetime import date
E também parâmetros:
#collapse
FIGSIZE = (8,4)
Em seguida, vamos importar a base de dados disponibilizada pelo repositório da John Hopkins University. Há duas bases relevantes para nosso caso, uma com o histórico do número de casos e outro com o histório do número de mortes (ambos obtido a partir dos relatórios diários da OMS).
#collapse
CASOS_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv'
MORTES_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv'
Ambos são urls diretas para arquivos .CSV, portanto podemos importá-los diretamente para a biblioteca Pandas do Python, sem precisar baixá-los:
#collapse
casos = pd.read_csv(CASOS_URL)
mortes = pd.read_csv(MORTES_URL)
Vamos visualizar o cabeçalho de cada um:
casos.head()
mortes.head()
Como para este relatório temos interesse em apenas dados do Brasil, vamos atribuir duas novas variáveis à ambos:
def filter_country(df, country):
# Filtrar pais
df = df[df['Country/Region']==country]
# Remover colunas iniciais, manter somente as datas
df = df.iloc[:, 4:]
# Transpor
df = df.T
# Redefinir coluna
df.columns = [country]
# Definir index para Datetime
df.index = pd.to_datetime(df.index)
return df
casos_brasil = filter_country(casos, 'Brazil')
mortes_brasil = filter_country(mortes, 'Brazil')
Vamos ver como ficaram os novos dataframes:
casos_brasil.tail()
mortes_brasil.tail()
Podemos facilmente também plotar tais dados para ver como estão:
casos_brasil.plot(title='Número de casos');
mortes_brasil.plot(title='Número de mortes');
Podemos conferir se o número de casos está se acelerando ou não plotando um gráfico de barras do número de casos por dia. Isso será importante para identificar a inflexão!
casos_novos_brasil = casos_brasil[1:]-casos_brasil[:-1].values
casos_novos_brasil.tail()
mortes_novas_brasil = mortes_brasil[1:]-mortes_brasil[:-1].values
mortes_novas_brasil.tail()
Em seguida vamos visualizar ambos em um gráfico de barras:
# Funcao personalizada
def plot_bar_novos(df, title):
df.index = df.index.strftime('%d/%m')#astype('str')
ax = df.plot.bar(
title=title,
figsize=FIGSIZE
)
_=plt.xticks(rotation=50)
return ax
primeiro_caso_filtro = casos_novos_brasil.values>0
ax=plot_bar_novos(casos_novos_brasil[primeiro_caso_filtro],
title='Casos novos (sem acumular, a partir do primeiro caso)',
)
# Filtrar a partir do primeiro obito
primeiro_obito_filtro = mortes_novas_brasil.values>0
# Criar plot
ax=plot_bar_novos(mortes_novas_brasil[primeiro_obito_filtro],
title='Óbitos novos (sem acumular, a partir do primeiro óbito)'
)
ax.figure.savefig('obitos_novos.png', dpi=300)
Vamos agora dar início à modelagem das projeções. Primeiramente, vamos fazer alguns plots em cima do log dos dados:
casos_brasil['Brazil'].apply(np.log).plot(marker='o', linestyle='', title='Log do Número de casos');
mortes_brasil['Brazil'].apply(np.log).plot(marker='o', linestyle='', title='Log do Número de Mortes');
casos_novos_brasil.apply(np.log).plot(marker='o', linestyle='', title='Log do número de casos novos');
mortes_novas_brasil.apply(np.log).plot(marker='o', linestyle='', title='Log de novas mortes');
Todas as curvas acima não representam uma reta perfeita, o que são ótimas notícias: Significa que talvez não esteja mais seguindo uma tendência exponencial. De qualquer forma, vou usar como ponto de partida uma regressão linear, apesar de haverem ressalvas sobre o uso dela. Por simplificação, vou começar com uma projeção do número total de casos e de mortes. Primeiramente, vou juntar todos os dados em um único dataframe:
brasil = pd.DataFrame({
'Confirmados Cumulativo': casos_brasil['Brazil'].values[1:],
'Confirmados Novos': casos_novos_brasil['Brazil'].values,
'Mortes Cumulativa': mortes_brasil['Brazil'].values[1:],
'Mortes Novas': mortes_novas_brasil['Brazil'].values
})
brasil.index = casos_novos_brasil.index
brasil.tail()
Também podemos criar um dataframe com o log de todos os dados:
brasil_log = brasil.apply(np.log1p)
brasil_log.tail()
Vamos visualizar a distribuição de cada um e também scatter plots de cada um versus o outro:
pd.plotting.scatter_matrix(brasil_log, figsize = (14,8), diagonal = 'kde');
O plot acima fica um pouco enviesado pois há um grande acúmulo de zeros. Vamos filtrar as datas a partir do promeiro óbito:
primeiro_obito = brasil_log.index[brasil_log['Mortes Cumulativa']>0][0]
primeiro_obito_filtro = brasil_log.index>=primeiro_obito
primeiro_obito
Como é uma base bastante recente, vamos também pegar o primeiro caso:
primeiro_caso = brasil_log.index[brasil_log['Confirmados Cumulativo']>0][0]
primeiro_caso_filtro = brasil_log.index>=primeiro_caso
primeiro_caso
Vamos repetir o scatter matrix acima a partir dos filtros que definimos:
g = sns.pairplot(brasil[primeiro_obito_filtro], kind="reg")
g.fig.tight_layout()
g.fig.subplots_adjust(top=0.88)
g.fig.suptitle('Grade de gráficos, dados a partir do primeiro óbito', y=0.92);
g = sns.pairplot(brasil[primeiro_caso_filtro], kind="reg")
g.fig.suptitle('Scatter Matrix com filtro a partir do primeiro caso', y=1.08);
g = sns.pairplot(brasil_log[primeiro_obito_filtro], kind="reg")
g.fig.suptitle('Scatter Matrix do Log dos dados com filtro a partir do primeiro óbito', y=1.08);
g = sns.pairplot(brasil_log[primeiro_obito_filtro], kind="reg")
g.fig.suptitle('Scatter Matrix do Log dos dados com filtro a partir do primeiro óbito', y=1.08);
Os plots acima, talvez pareçam informação irrelevante, mas o fiz para ter uma ideia sobre a distribuição dos dados e se há alguma distribuição normal em algum caso, que justificaria o uso do desvio padrão (uma vez que o mesmo é em relação à distribuição normal). De qualquer forma, para manter as coisas simples inicialmente, vou manter meu plano inicial de criar uma projeção do número de casos incluindo o intervalo do desvio padrão. Então vamos começar pegando o as estatísticas filtrando a partir do primeiro caso:
std = brasil_log[primeiro_caso_filtro].std()
Visualizar um plot da tendência que vamos modelar:
brasil_log[primeiro_caso_filtro]['Confirmados Cumulativo'].plot()
Em seguida ajustar uma regressão linear em cima do log:
def fitar(x, y):
lr = LinearRegression()
lr.fit(x,y)
return lr
x = np.arange(sum(primeiro_caso_filtro)).reshape(-1,1)
y = brasil_log[primeiro_caso_filtro]['Confirmados Cumulativo'].values
lr_casos = fitar(x, y)
def projetar(lr, x, y, plot=True):
y_pred = lr.predict(x)
if plot:
plt.scatter(x, y)
plt.plot(y_pred, 'r')
plt.title("Projeção logarítmica")
plt.show()
plt.scatter(x, np.expm1(y))
plt.plot(np.expm1(y_pred), 'r')
plt.title("Projeção exponencial")
plt.show()
return y_pred
y_pred = projetar(lr_casos, x, y)
A projeção acima não ficou boa, vamos testar suavizar com um exponential moving average:
def generate_moving_average(data, mom=0.7):
MOM = 0.7
rolling_mean = [data[0]]
for d in data[1:]:
rolling_mean.append(rolling_mean[-1]*MOM + (1-MOM)*d)
return np.array(rolling_mean)
def fitar_projetar(x, y, mom=None):
if mom is not None:
y_old = y
y = generate_moving_average(y)
lr = fitar(x, y)
y_pred = projetar(lr, x, y, plot=not mom) # Plota se mom for nulo
if mom is not None:
y_pred_exp = np.expm1(y_pred)
reversed_mv = [(rm-rm_*mom)/(1-mom)
for rm, rm_ in zip(y_pred_exp[1:], y_pred_exp[:-1])]
plt.scatter(x, np.expm1(y_old), label="Original")
plt.scatter(x, np.expm1(y), label="Exp. Moving Average")
plt.plot(y_pred_exp, 'r', label="Projeção do EMA")
plt.plot(reversed_mv, label="EMA reverso para projeção real")
plt.legend()
plt.title(f"Projeção com Exp. Moving Average (momentum = {mom})")
plt.show()
return y_pred
y_pred = fitar_projetar(x, y, mom=0.5)
Também não ficou muito boa. Um dos motivos de estas funções não estarem se saindo bem é que o erro está sendo inferido na escala logarítmica, ou seja, enquanto estamos na escala logarítmica, o erro em termos absolutos é pequeno, no entanto, quando passamos à escala normal, o erro aumenta consideravelmente. Parra corrigir isso, precisamos ajustar uma função exponencial diretamente sem realizar a transformação logarítmica. Precisamos de uma função de custo que infira o erro diretamente da exponencial. Podemos fazer isso com o auxílio de otimizadores de redes neurais. Vamos também aproveitar para ajustar uma sigmóide. E aqui um detalhe: para quem é de machine learning, as boas práticas de seleção de modelos não se aplicam aqui (por exemplo, divisão treinamento e teste). Como os modelos a serem ajustados são bastante simples, não há a necessidade de reservar um conjunto de validação/teste.
# Creating a model
from keras.models import Sequential;
from keras.layers import Dense
from keras import backend as K
from keras.optimizers import Adam
from keras.activations import sigmoid, elu
def get_keras_model(lr=0.001, activation='exp'):
if activation=='exp':
def activ_func(x):
return K.exp(x) - 1
elif activation=='sigm':
activ_func = sigmoid
elif activation=='elu':
activ_func = elu
# Usage
model = Sequential();
model.add(Dense(1, input_dim=1, activation=activ_func))
model.compile(optimizer=Adam(lr=lr), loss='mean_squared_error')
return model
# Definir entrada e saida
x = np.arange(sum(primeiro_caso_filtro)).reshape(-1,1)
y = brasil[primeiro_caso_filtro]['Confirmados Cumulativo'].values
# Preparar dados
def preparar_dados(x, y):
x_mean = x.mean()
x_std = x.std()
y_mean = y.mean()
y_std = y.std()
x_prep = (x-x_mean)/x_std
y_prep = (y-y_mean)/y_std
return x_prep, y_prep
x_prep, y_prep = preparar_dados(x, y)
#hide_output
model = get_keras_model(lr=0.1)
model.fit(x_prep, y_prep, epochs=100, verbose=0);
from sklearn.metrics import r2_score
def projetar_keras(model, x, y, days_ahead=0, plot=True,
model_func="exponencial",
tipo='caso', exibir_pontos=False,
anotar_dados_reais=True,
dados_reais_step=2):
title = f'Projeção {model_func} de {tipo}s de Covid-19 no Brasil' + f' para os próximos {days_ahead} dias'
hoje = date.today()
hoje = hoje.strftime("%d/%m")
x_mean = x.mean()
x_std = x.std()
y_mean = y.mean()
y_std = y.std()
n_days = len(x)
x_proj = np.arange(n_days+days_ahead).reshape(-1,1)
x_prep = (x_proj-x_mean)/x_std
y_prep = (y-y_mean)/y_std
y_pred = model.predict(x_prep)
y_pred = y_pred*y_std + y_mean #Reverse y_pred back
y_pred = y_pred.astype(int)
y_pred = np.clip(y_pred, 0, None)
y_pred = y_pred.squeeze()
#print(y)
#print(y_pred)
r2 = r2_score(y, y_pred[:-days_ahead])
if plot:
fig = plt.figure(figsize=FIGSIZE)
# Plotar projeção
plt.plot(x_proj[-days_ahead-1:], y_pred[-days_ahead-1:], '-',
color='red', label='Projeção')
# Plotar função
if exibir_pontos:
plt.plot(x_proj[:-days_ahead], y_pred[:-days_ahead], '-', color='red',
label=f'Ajuste {model_func} (R²={r2:.3f})', alpha=0.4)
#plt.scatter(x, y, label='Dados reais', color='orange')
plt.plot(y, '.', label='Dados reais')
plt.title(title)
for x_anot, y_anot in zip(x_proj[-days_ahead:], y_pred[-days_ahead:]):
plt.annotate(y_anot, (x_anot, y_anot), ha='right', color='red')
plt.annotate(f'({hoje}): {y[-1]}', (x[-1], y[-1]), ha='right',
color='black',
textcoords="offset points",
xytext=(-5,0))
if anotar_dados_reais:
for x_anot, y_anot in zip(x[:-1:dados_reais_step], y[:-1:dados_reais_step]):
plt.annotate(y_anot, (x_anot, y_anot), ha='right',
color='midnightblue',
textcoords="offset points",
xytext=(-5,3)
)
plt.grid(color='black', linestyle='--', linewidth=0.17)
plt.legend()
plt.xlabel(f"Dias desde o primeiro {tipo} no Brasil")
plt.ylabel(f'{tipo.capitalize()}s confirmados acumulados')
plt.xticks(x_proj[::2])
plt.show()
return y_pred, fig
y_pred, fig = projetar_keras(model, x, y, days_ahead=7,
exibir_pontos=True,
anotar_dados_reais=False,
dados_reais_step=3)
Por curiosidade, vamos testar ajustar um sigmóide:
#hide_output
model = get_keras_model(lr=1, activation='sigm')
model.fit(x_prep, y_prep, epochs=100, verbose=0);
y_pred, fig = projetar_keras(model, x, y, days_ahead=5, model_func='sigmóide', exibir_pontos=True)
Bem distante do esperado :) Vamos testar também o ELU:
#hide_output
model = get_keras_model(lr=0.1, activation='elu')
model.fit(x_prep, y_prep, epochs=100, verbose=0);
y_pred, fig = projetar_keras(model, x, y, days_ahead=7, model_func="ELU (Exp. Linear Unit)", exibir_pontos=True)
Um pouco melhor, mas neste caso com tendência linear. Vamos também replicar estes dois últimos para o número de óbitos.
# Definir entrada e saida
x = np.arange(sum(primeiro_obito_filtro)).reshape(-1,1)
y = brasil[primeiro_obito_filtro]['Mortes Cumulativa'].values
x_prep, y_prep = preparar_dados(x, y)
#hide_output
model = get_keras_model(lr=0.1, activation='exp')
model.fit(x_prep, y_prep, epochs=100, verbose=0);
y_pred, fig = projetar_keras(model, x, y, days_ahead=7, model_func="exponencial",
tipo='obito',
exibir_pontos=True,
anotar_dados_reais=False,
dados_reais_step=3)
#hide_output
model = get_keras_model(lr=0.1, activation='elu')
model.fit(x_prep, y_prep, epochs=100, verbose=0);
y_pred, fig = projetar_keras(model, x, y,
days_ahead=7,
model_func="ELU (Exp. Linear Unit)",
tipo='obito',
exibir_pontos=True)