8000 Release 2.2.5 by yang-xiaodong · Pull Request #162 · dotnetcore/CAP · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Release 2.2.5 #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/version.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionPatch>5</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
Expand Down
8 changes: 7 additions & 1 deletion samples/Sample.RabbitMQ.MySql/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Builder;
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -16,6 +17,11 @@ public void ConfigureServices(IServiceCollection services)
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, name, content) =>
{
Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
};
});

services.AddMvc();
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

Expand Down Expand Up @@ -80,7 +80,7 @@ public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

Expand Down Expand Up @@ -77,7 +77,7 @@ public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
Expand Down
5 changes: 4 additions & 1 deletion src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public class RabbitMQOptions
/// <summary> The topic exchange type. </summary>
public const string ExchangeType = "topic";

/// <summary>The host to connect to.</summary>
/// <summary>
/// The host to connect to.
/// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
/// </summary>
public string HostName { get; set; } = "localhost";

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private static Func<IConnection> CreateConnection(RabbitMQOptions options)
{
var factory = new ConnectionFactory
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
Expand All @@ -86,6 +85,13 @@ private static Func<IConnection> CreateConnection(RabbitMQOptions options)
SocketWriteTimeout = options.SocketWriteTimeout
};

if (options.HostName.Contains(","))
{
return () => factory.CreateConnection(
options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries));
}

factory.HostName = options.HostName;
return () => factory.CreateConnection();
}

Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

Expand Down Expand Up @@ -78,7 +78,7 @@ public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
Expand Down
9 changes: 5 additions & 4 deletions src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private async Task PublishWithTransAsync<T>(string name, T contentObj, string ca
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
Expand All @@ -204,10 +204,11 @@ private void PublishWithTrans<T>(string name, T contentObj, string callbackName

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
Console.WriteLine("================22222222222222=====================");
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = Execute(DbConnection, DbTransaction, message);

Console.WriteLine("================777777777777777777777=====================");
ClosedCap();

if (id > 0)
Expand All @@ -220,7 +221,7 @@ private void PublishWithTrans<T>(string name, T contentObj, string callbackName
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
Expand Down
11 changes: 3 additions & 8 deletions src/DotNetCore.CAP/Dashboard/DashboardRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,14 @@ internal sealed class CapDashboardRequest : DashboardRequest

public CapDashboardRequest(HttpContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}

_context = context;
_context = context ?? throw new ArgumentNullException(nameof(context));
}

public override string Method => _context.Request.Method;
public override string Path => _context.Request.Path.Value;
public override string PathBase => _context.Request.PathBase.Value;
public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString();
public override string LocalIpAddress => _context.Connection.LocalIpAddress.MapToIPv4().ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.MapToIPv4().ToString();

public override string GetQuery(string key)
{
Expand Down
20 changes: 15 additions & 5 deletions src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System.Reflection;
using DotNetCore.CAP.Dashboard.Pages;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.Dashboard
{
Expand Down Expand Up @@ -83,24 +83,34 @@ static DashboardRoutes()
Routes.AddJsonResult("/published/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});
Routes.AddJsonResult("/received/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});

Routes.AddPublishBatchCommand(
"/published/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangePublishedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetPublishedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg);
});
Routes.AddPublishBatchCommand(
"/received/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangeReceivedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToExecute(msg);
});

Routes.AddRazorPage(
"/published/(?<StatusName>.+)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,27 @@ public class LocalRequestsOnlyAuthorizationFilter : IDashboardAuthorizationFilte
{
public bool Authorize(DashboardContext context)
{
var ipAddress = context.Request.RemoteIpAddress;
// if unknown, assume not local
if (string.IsNullOrEmpty(context.Request.RemoteIpAddress))
if (string.IsNullOrEmpty(ipAddress))
{
return false;
}

// check if localhost
if (context.Request.RemoteIpAddress == "127.0.0.1" || context.Request.RemoteIpAddress == "::1")
if (ipAddress == "127.0.0.1" || ipAddress == "0.0.0.1")
{
return true;
}

// compare with local address
if (context.Request.RemoteIpAddress == context.Request.LocalIpAddress)
if (ipAddress == context.Request.LocalIpAddress)
{
return true;
}

// check if private ip
if (Helper.IsInnerIP(context.Request.RemoteIpAddress))
if (Helper.IsInnerIP(ipAddress))
{
return true;
}
Expand Down
94 changes: 58 additions & 36 deletions src/DotNetCore.CAP/IPublishMessageSender.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ protected BasePublishMessageSender(
public abstract Task<OperateResult> PublishAsync(string keyName, string content);

public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{
bool retry;
OperateResult result;
do
{
var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Expand All @@ -63,67 +81,71 @@ public async Task<OperateResult> SendAsync(CapPublishedMessage message)

TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);

return OperateResult.Success;
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);

await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry)
{
_logger.SenderRetrying(message.Id, message.Retries);

await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}

private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}

private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{
IState newState = new FailedState();
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
{
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.SenderRetrying(message.Id, retries);

return true;
}

private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();
Expand Down
Loading
0